python或dask并行生成器?

是否有可能在python中(可能使用dask,可能使用多处理)在核心上“放置”生成器,然后并行地逐步完成生成器并处理结果?

它需要特别是生成器(或具有 __iter__ 的对象);生成器产生的所有生成元素的列表将不适合内存 .

特别是:

使用pandas,我可以调用 read_csv(...iterator=True) ,这给了我一个迭代器(TextFileReader) - 我可以 for in 它或者多次显式调用next . 整个csv永远不会被读入内存 . 尼斯 .

每次我从迭代器中读取下一个块时,我也会对它执行一些昂贵的计算 .

但现在我有2个这样的文件 . 我想创建2个这样的生成器,并在一个核心上“放置”1,在另一个核心上放置1个,这样我就可以:

result = expensive_process(next(iterator))

在每个核心上,并行,然后组合并返回结果 . 重复此步骤,直到一个发电机或两个发电机都没有产量 .

看起来TextFileReader不是pickleable,也不是生成器 . 我无法在dask或多处理中找到如何执行此操作 . 有这种模式吗?

回答(2)

2 years ago

Dask的read_csv旨在以块的形式加载来自多个文件的数据,并且可以指定一个块大小 . 当您对结果数据帧进行操作时,您将以chunk方式工作,这正是首先使用Dask的重点 . 应该不需要使用迭代器方法 .

您最想要使用的dask数据帧方法是 map_partitions() .

如果你真的想使用迭代器的想法,你应该查看 dask.delayed ,它能够并行化任意python函数,通过向函数发送函数的每个调用(每个函数使用不同的文件名) .

2 years ago

幸运的是,我认为这个问题可以很好地映射到python的多处理.Process和.Queue .

def data_generator(whatever):
   for v in something(whatever):
      yield v

def generator_constructor(whatever):
   def generator(outputQueue):
      for d in data_generator(whatever):
         outputQueue.put(d)
      outputQueue.put(None) # sentinel
   return generator

def procSumGenerator():
   outputQs = [Queue(size) for _ in range(NumCores)]
   procs = [Process(target=generator_constructor(whatever),
                    args=(outputQs[i],))
            for i in range(NumCores)] 

   for proc in procs: proc.start()

   # until any output queue returns a None, collect 
   # from all and yield
   done = False
   while not done:
      results = [oq.get() for oq in outputQs]
      done = any(res is None for res in results)
      if not done:
         yield some_combination_of(results)

   for proc in procs: terminate()

for v in procSumGenerator():
   print(v)

也许这可以用Dask做得更好?我发现我的解决方案相当快地使网络饱和了大量生成的数据 - 我正在使用pandas操作csv并返回大型numpy数组 .

https://github.com/colinator/doodle_generator/blob/master/data_generator_uniform_final.ipynb