首页 文章

使用dask加载大型数据集

提问于
浏览 137
2

我处于HPC环境中,具有群集,紧密耦合的互连以及支持Lustre文件系统 . 我们一直在探索如何利用Dask不仅提供计算,还充当分布式缓存以加速我们的工作流程 . 我们的专有数据格式是n维和常规的,我们编写了一个惰性读者来传递给from_array / from_delayed方法 .

我们在Dask集群中加载和持久存储大于内存的数据集时遇到了一些问题 .

hdf5示例:

# Dask scheduler has been started and connected to 8 workers
# spread out on 8 machines, each with --memory-limit=150e9.
# File locking for reading hdf5 is also turned off
from dask.distributed import Client
c = Client({ip_of_scheduler})
import dask.array as da
import h5py
hf = h5py.File('path_to_600GB_hdf5_file', 'r')
ds = hf[hf.keys()[0]]
x = da.from_array(ds, chunks=(100, -1, -1))
x = c.persist(x) # takes 40 minutes, far below network and filesystem capabilities
print x[300000,:,:].compute() # works as expected

我们还从一些自己的文件文件格式中加载了数据集(使用切片,dask.delayed和from_delayed),并且随着文件大小的增加,性能也出现了类似的下降 .

我的问题:使用Dask作为分布式缓存是否存在固有的瓶颈?是否会强制所有数据通过调度程序进行漏斗? Worker 是否能够利用Lustre,或者以某种方式序列化功能和/或I / O?如果是这种情况,那么在大量数据集上调用持久化并让Dask在需要时处理数据和计算会更有效吗?

1 回答

  • 1
    • 使用Dask作为分布式缓存是否存在固有的瓶颈?

    每个系统都存在瓶颈,但听起来你似乎没有遇到我对Dask期望的瓶颈 . 我怀疑你正在遇到其他问题 .

    • 是否会强制所有数据通过调度程序进行漏斗?

    不,工作人员可以执行自己加载数据的功能 . 然后,这些数据将留在 Worker 身上 .

    • Worker 是否能够利用Lustre,或者以某种方式序列化功能和/或I / O?

    工作者只是Python进程,所以如果在集群上运行的Python进程可以利用Lustre(几乎可以肯定是这种情况)那么是的,Dask Workers可以利用Lustre .

    • 如果是这种情况,那么在大量数据集上调用persist并让Dask在需要时处理数据和计算会更有效吗?

    这当然很常见 . 这里的权衡是在NFS的分布式带宽和分布式内存的可用性之间 .

    在你的位置,我会使用Dask的诊断来弄清楚占用了多少时间 . 您可能希望仔细阅读understanding performance上的文档和dashboard上的部分 . 该部分的视频可能特别有用 . 我会问两个问题:

    • Worker 是否一直在运行任务? (状态页面,任务流图)

    • 在这些任务中,什么是占用时间? ( Profiles 页)

相关问题