这是this question的后续行动 .

我在分布式内存中持久保存大型数据集时遇到了问题 . 我有一台运行在一台机器上的调度程序和8台工作人员,每台工作人员在自己的机器上运行,由40千兆位以太网和一个支持Lustre文件系统连接 .

问题1:

ds = DataSlicer(dataset) # ~600 GB dataset
dask_array = dask.array.from_array(ds, chunks=(13507, -1, -1), name=False) # ~22 GB chunks
dask_array = client.persist(dask_array)

在检查Dask状态仪表板时,我看到所有28个任务都由一个工作人员分配和处理,而其他工作人员什么都不做 . 此外,当每个任务完成处理并且任务都处于“内存中”状态时,只有22 GB的RAM(即数据集的第一个块)实际存储在群集上 . 对第一个块内的索引的访问速度很快,但是任何其他索引都会强制进行新一轮的读取并在结果返回之前加载数据 . 这似乎与我的观点相反,即.persist()应该在完成执行后将整个数据集固定在工作人员的内存中 . 此外,当我增加块大小时,一个工作程序经常耗尽内存并重新启动,因为它被分配了多个巨大的数据块 .

有没有办法手动将块分配给工作人员而不是在一个进程上堆积所有任务的调度程序?或者是这种异常的调度程序行为?有没有办法确保将整个数据集加载到RAM中?

问题2

我发现了一个临时的解决方法,将数据集的每个块视为自己独立的dask数组并单独保存每个数据集 .

dask_arrays = [da.from_delayed(lazy_slice, shape, dtype, name=False) for \
               lazy_slice, shape in zip(lazy_slices, shapes)]
for i in range(len(dask_arrays)):
    dask_arrays[i] = client.persist(dask_arrays[i])

我通过在数据集的不同块上并行调用.compute()来测试从持久和已发布的dask数组到几个并行读取器的带宽 . 我无法从dask集群获得超过2 GB / s的聚合带宽,远低于我们网络的功能 .

调度程序是否是这种情况下的瓶颈,即是否所有数据都通过调度程序传递给我的读者?如果是这种情况,有没有办法直接从每个工作人员获取内存数据?如果不是这种情况,我可以调查dask中的其他一些方面吗?