首页 文章

python或dask并行生成器?

提问于
浏览 371
1

是否有可能在python中(可能使用dask,可能使用多处理)在核心上“放置”生成器,然后并行地逐步完成生成器并处理结果?

它需要特别是生成器(或具有 __iter__ 的对象);生成器产生的所有生成元素的列表将不适合内存 .

特别是:

使用pandas,我可以调用 read_csv(...iterator=True) ,这给了我一个迭代器(TextFileReader) - 我可以 for in 它或者多次显式调用next . 整个csv永远不会被读入内存 . 尼斯 .

每次我从迭代器中读取下一个块时,我也会对它执行一些昂贵的计算 .

但现在我有2个这样的文件 . 我想创建2个这样的生成器,并在一个核心上“放置”1,在另一个核心上放置1个,这样我就可以:

result = expensive_process(next(iterator))

在每个核心上,并行,然后组合并返回结果 . 重复此步骤,直到一个发电机或两个发电机都没有产量 .

看起来TextFileReader不是pickleable,也不是生成器 . 我无法在dask或多处理中找到如何执行此操作 . 有这种模式吗?

2 回答

  • 1

    幸运的是,我认为这个问题可以很好地映射到python的多处理.Process和.Queue .

    def data_generator(whatever):
       for v in something(whatever):
          yield v
    
    def generator_constructor(whatever):
       def generator(outputQueue):
          for d in data_generator(whatever):
             outputQueue.put(d)
          outputQueue.put(None) # sentinel
       return generator
    
    def procSumGenerator():
       outputQs = [Queue(size) for _ in range(NumCores)]
       procs = [Process(target=generator_constructor(whatever),
                        args=(outputQs[i],))
                for i in range(NumCores)] 
    
       for proc in procs: proc.start()
    
       # until any output queue returns a None, collect 
       # from all and yield
       done = False
       while not done:
          results = [oq.get() for oq in outputQs]
          done = any(res is None for res in results)
          if not done:
             yield some_combination_of(results)
    
       for proc in procs: terminate()
    
    for v in procSumGenerator():
       print(v)
    

    也许这可以用Dask做得更好?我发现我的解决方案相当快地使网络饱和了大量生成的数据 - 我正在使用pandas操作csv并返回大型numpy数组 .

    https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb

  • 0

    Dask的read_csv旨在以块的形式加载来自多个文件的数据,并且可以指定一个块大小 . 当您对结果数据帧进行操作时,您将以chunk方式工作,这正是首先使用Dask的重点 . 应该不需要使用迭代器方法 .

    您最想要使用的dask数据帧方法是 map_partitions() .

    如果你真的想使用迭代器的想法,你应该查看 dask.delayed ,它能够并行化任意python函数,通过向函数发送函数的每个调用(每个函数使用不同的文件名) .

相关问题