首页 文章

从延迟集合创建大型dask.dataframe时出现Killed / MemoryError

提问于
浏览
5

我试图从一堆大型CSV文件(目前有12个文件,每个8-10万行和50列)创建一个dask.dataframe . 其中一些可能会合并到我的系统内存中,但它们一次肯定不会,因此使用dask而不是常规的pandas .

由于读取每个csv文件涉及一些额外的工作(使用文件路径中的数据添加列),我尝试从延迟对象列表创建dask.dataframe,类似to this example .

这是我的代码:

import dask.dataframe as dd
from dask.delayed import delayed
import os
import pandas as pd

def read_file_to_dataframe(file_path):
    df = pd.read_csv(file_path)
    df['some_extra_column'] = 'some_extra_value'
    return df

if __name__ == '__main__':
    path = '/path/to/my/files'
    delayed_collection = list()
    for rootdir, subdirs, files in os.walk(path):
        for filename in files:
            if filename.endswith('.csv'):
                file_path = os.path.join(rootdir, filename)
                delayed_reader = delayed(read_file_to_dataframe)(file_path)
                delayed_collection.append(delayed_reader)

    df = dd.from_delayed(delayed_collection)
    print(df.compute())

当启动这个脚本(Python 3.4,dask 0.12.0)时,它运行了几分钟,而我的系统内存不断填满 . 当它被完全使用时,一切都开始滞后并且运行了几分钟,然后它与 killedMemoryError 崩溃 .

我认为dask.dataframe的重点是能够操作跨越磁盘上多个文件的大于内存的数据帧,所以我在这里做错了什么?

edit:df = dd.read_csv(path + '/*.csv') 读取文件似乎可以正常工作 . 但是,这不允许我使用文件路径中的其他数据更改每个数据帧 .

edit #2: 遵循MRocklin 's answer, I tried to read my data with dask' s read_bytes() method以及使用single-threaded scheduler以及两者结合使用 . 尽管如此,即使在具有8GB内存的笔记本电脑上以单线程模式读取100MB的块,我的过程迟早会被杀死 . 在一堆类似形状的小文件(每个大约1MB)上运行下面所述的代码可以正常工作 . 我在这里做错了什么想法?

import dask
from dask.bytes import read_bytes
import dask.dataframe as dd
from dask.delayed import delayed
from io import BytesIO
import pandas as pd

def create_df_from_bytesio(bytesio):
    df = pd.read_csv(bytesio)
    return df

def create_bytesio_from_bytes(block):
    bytesio = BytesIO(block)
    return bytesio


path = '/path/to/my/files/*.csv'

sample, blocks = read_bytes(path, delimiter=b'\n', blocksize=1024*1024*100)
delayed_collection = list()
for datafile in blocks:
    for block in datafile:
        bytesio = delayed(create_bytesio_from_bytes)(block)
        df = delayed(create_df_from_bytesio)(bytesio)
        delayed_collection.append(df)

dask_df = dd.from_delayed(delayed_collection)
print(dask_df.compute(get=dask.async.get_sync))

1 回答

  • 8

    如果你的每个文件都很大,那么在Dask有机会变得聪明之前,对 read_file_to_dataframe 的几次并发调用可能会泛滥内存 .

    Dask通过按顺序运行函数来尝试在低内存中运行,以便可以快速删除中间结果 . 但是,如果只有少数函数的结果可以填满内存,那么Dask可能永远不会有机会删除内容 . 例如,如果您的每个函数都生成了2GB的数据帧,并且如果您同时运行了8个线程,那么在Dask的调度策略启动之前,您的函数可能会产生16GB的数据 .

    一些选项

    使用dask.bytes.read_bytes

    read_csv的工作原理是它将大型CSV文件分块为多个~100MB的字节块(参见 blocksize= 关键字参数) . 你也可以这样做,虽然这很棘手,因为你需要总是打破终点 .

    dask.bytes.read_bytes 功能可以帮到你 . 它可以将单个路径转换为 delayed 对象列表,每个对象对应于该文件的字节范围,该字节范围在分隔符上干净地启动和停止 . 然后,您将这些字节放入 io.BytesIO (标准库)并在其上调用 pandas.read_csv . 请注意,您还必须处理 Headers 等 . 该函数的文档字符串很广泛,应该提供更多帮助 .

    使用单个线程

    在上面的例子中,如果我们没有并行度的8倍乘数,一切都会好的 . 我怀疑,如果你只是一次运行一个函数,事情可能会在没有达到内存限制的情况下进行管道传输 . 您可以将dask设置为仅使用具有以下行的单个线程

    dask.set_options(get=dask.async.get_sync)
    

    注意:For Dask版本> = 0.15,您需要使用 dask.local.get_sync 代替 .

    确保结果适合内存(对编辑2的响应)

    如果你创建一个dask.dataframe然后立即计算它

    ddf = dd.read_csv(...)
    df = ddf.compute()
    

    您将所有数据加载到Pandas数据帧中,最终会耗尽内存 . 相反,最好在Dask数据帧上运行,只计算小的结果 .

    # result = df.compute()  # large result fills memory
    result = df.groupby(...).column.mean().compute()  # small result
    

    转换为其他格式

    CSV是一种普遍而实用的格式,但也存在一些缺陷 . 您可以考虑使用HDF5或Parquet等数据格式 .

相关问题