首页 文章

以块的形式创建xarray DataArray并将其写入NetCDF

提问于
浏览
1

是否也可以创建一个核心外的DataArray,并使用xarray将其逐块编写到NetCDF4文件中?

例如,我希望能够在尺寸更大的情况下以非核心方式执行此操作,因此我无法将整个数组存储在内存中:

num_steps = 20
num_times = 100
#Create DataArray
d = xr.DataArray(np.zeros([num_steps, num_times], np.float32),
                 {'Step': np.arange(num_steps),
                  'Time': np.arange(num_times)},
                 ('Step', 'Time'))
#Computatation
for i in range(num_steps):
    d[i, :] = i
#Write to file
d.to_netcdf('test.nc')

所以我不想在内存中创建整个NumPy数组,我希望Computation和Write to files阶段一次完成一个块(在本例中以Step维度为块) .

更新:似乎(来自@ jhamman的回答)可能无法使用xarray实现上面的示例 . 我主要感兴趣的是用xarray更好地理解核外计算,所以我没有具体的计算,但是,因为我被要求一个更复杂的例子,一个潜在的应用我有:是

for i in range(num_steps):
    u[:] = f(u)
    s[:] = g(s)
    d[i, :] = u[:] * s[:]

其中 us 是维度Time的xr.DataArrays,而 fg 是仅依赖于上一步骤的输入数组的PDE求解器 . 假设有1000个步骤,但Time维数太大,我只能在内存中存储一个或两个,因此必须将 d 的赋值写入磁盘,然后释放相关的内存 .

2 回答

  • 1

    是的,xarray支持核外数组并以块的形式写入 . 您需要使用xarray操作和Dask数组而不是NumPy数组来编写计算 . xarray docs在这里应该会有所帮助 .

    Update :对于这样的模拟,您需要使用dask.delayed计算每个函数 f . 然后你可以用 dask.array.from_delayed 转换dask数组中的结果,将它们包装回 xarray.DataArray 并用 to_netcdf() 将数据直接写入磁盘 . 结果以流式方式进行,并行计算 f()g() 并且不超过几个时间步加载到内存中:

    import dask
    import dask.array as da
    import numpy as np
    import xarray
    
    def f(x):
        return 1.1 * x
    
    def g(x):
        return 0.9 * x
    
    num_steps = 1000
    num_times = int(1e6)
    
    u = np.ones(num_times)
    s = np.ones(num_times)
    
    arrays = []
    for i in range(num_steps):
        u = dask.delayed(f)(u)
        s = dask.delayed(g)(s)
        product = da.from_delayed(u * s, shape=(num_times,), dtype=float)
        arrays.append(product)
    
    stacked = da.stack(arrays)
    data_array = xarray.DataArray(stacked, dims=['step', 'time'])
    %time data_array.to_netcdf('results.nc')
    # CPU times: user 7.44 s, sys: 13.5 s, total: 20.9 s
    # Wall time: 29.4 s
    

    您会注意到xarray非常适合这种计算:大部分计算都是使用dask / numpy完成的 . 您也可以使用xarray对象轻松完成此操作,但我们没有方便的方法通过dask延迟对象传递带标签的数组元数据,因此无论哪种方式,您都需要在另一侧重建元数据 .

    你可以争辩说,在这里使用dask是过度的,你可能是对的 . 即使您想使用dask进行并行化,您仍然可能希望在每个步骤之后以有效的netCDF文件的形式检查模拟 .

    因此,在每次迭代时扩展netCDF文件的简单循环可能是您想要的 . 这是xarray的not yet supported,但这将是一个很好的功能 . 应该可以使用以下界面:

    for i in range(num_steps):
        u[:] = f(u)
        s[:] = g(s)
        d[:] = u[:] * s[:]
        d.to_netcdf('results.nc', extend='step')
    

    在此期间,您可以为每个步骤编写单独的文件,例如,

    for i in range(num_steps):
        u[:] = f(u)
        s[:] = g(s)
        d[:] = u[:] * s[:]
        d.to_netcdf('results-%04d.nc' % i)
    

    然后,您可以将所有数据加载到一起,然后使用 open_mfdataset 将其合并到一个文件中,例如,

    combined = xarray.open_mfdataset('results-*.nc', autoclose=True)
    combined.to_netcdf('results-combined.nc')
    
  • 2

    Dask数组当前不支持项目分配,请参阅Item assignment to Python dask array objects .

    因此,如果 d 是一个带有dask.array的 xarray.DataArray ,这将无效 .

    此外,当前的Xarray后端都没有支持分块写入 . 编辑:正如@shoyer指出的那样,有可能让xarray以递增方式编写分块数组 . 但是,对于您的用例,由于您似乎需要项目分配,可能需要直接使用 netCDF4-python 库:

    from netCDF4 import Dataset
    
    f = Dataset('test.nc', mode='w')
    f.createDimension("Step", nsteps)
    f.createDimension("time", ntimes)
    d = f.createVariable("d", "f4",("Step", "time"))
    
    #Computatation
    for i in range(num_steps):
        d[i, :] = i
    

    我假设你的计算比你的例子更复杂,所以你可能会考虑用使用xarray / dask的东西替换 = i .

相关问题