首页 文章

用于多个参数的Python多处理pool.map

提问于
浏览
326

在Python多处理库中,是否存在支持多个参数的pool.map变体?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()

16 回答

  • 22

    J.F. Sebastian回答了解了itertools后,我决定更进一步,编写一个负责并行化的 parmap 包,在python-2.7和python-3.2(以及后来的)上提供 mapstarmap 函数,可以使用任意数量的位置论证 .

    安装

    pip install parmap
    

    如何并行化:

    import parmap
    # If you want to do:
    y = [myfunction(x, argument1, argument2) for x in mylist]
    # In parallel:
    y = parmap.map(myfunction, mylist, argument1, argument2)
    
    # If you want to do:
    z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
    # In parallel:
    z = parmap.starmap(myfunction, mylist, argument1, argument2)
    
    # If you want to do:
    listx = [1, 2, 3, 4, 5, 6]
    listy = [2, 3, 4, 5, 6, 7]
    param = 3.14
    param2 = 42
    listz = []
    for (x, y) in zip(listx, listy):
            listz.append(myfunction(x, y, param1, param2))
    # In parallel:
    listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
    

    我已将parmap上传到PyPI和github repository .

    例如,问题可以回答如下:

    import parmap
    
    def harvester(case, text):
        X = case[0]
        text+ str(X)
    
    if __name__ == "__main__":
        case = RAW_DATASET  # assuming this is an iterable
        parmap.map(harvester, case, "test", chunksize=1)
    
  • 7

    另一种方法是将列表列表传递给单参数例程:

    import os
    from multiprocessing import Pool
    
    def task(args):
        print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
    
    pool = Pool()
    
    pool.map(task, [
            [1,2],
            [3,4],
            [5,6],
            [7,8]
        ])
    

    人们可以用一个最喜欢的方法构建一个参数列表 .

  • 0

    #“如何采取多个参数” .

    def f1(args):
        a, b, c = args[0] , args[1] , args[2]
        return a+b+c
    
    if __name__ == "__main__":
        import multiprocessing
        pool = multiprocessing.Pool(4) 
    
        result1 = pool.map(f1, [ [1,2,3] ])
        print(result1)
    
  • 5

    您可以使用以下两个函数,以避免为每个新函数编写包装器:

    import itertools
    from multiprocessing import Pool
    
    def universal_worker(input_pair):
        function, args = input_pair
        return function(*args)
    
    def pool_args(function, *args):
        return zip(itertools.repeat(function), zip(*args))
    

    使用函数 function 以及参数列表 arg_0arg_1arg_2 ,如下所示:

    pool = Pool(n_core)
    list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
    pool.close()
    pool.join()
    
  • 352

    更好的方法是使用 decorator 而不是手动编写 wrapper function . 特别是当您有许多要映射的函数时,装饰器将通过避免为每个函数编写包装来节省您的时间 . 通常装饰函数不可选,但我们可以使用 functools 来绕过它 . 更多的讨论可以找到here .

    这里的例子

    def unpack_args(func):
        from functools import wraps
        @wraps(func)
        def wrapper(args):
            if isinstance(args, dict):
                return func(**args)
            else:
                return func(*args)
        return wrapper
    
    @unpack_args
    def func(x, y):
        return x + y
    

    然后你可以用压缩参数映射它

    np, xlist, ylist = 2, range(10), range(10)
    pool = Pool(np)
    res = pool.map(func, zip(xlist, ylist))
    pool.close()
    pool.join()
    

    当然,您可以在Python 3(> = 3.3)中使用Pool.starmap,如其他答案中所述 .

  • 6

    我认为以下会更好

    def multi_run_wrapper(args):
       return add(*args)
    def add(x,y):
        return x+y
    if __name__ == "__main__":
        from multiprocessing import Pool
        pool = Pool(4)
        results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
        print results
    

    产量

    [3, 5, 7]
    
  • 9

    在官方文档中声明它只支持一个可迭代参数 . 我喜欢在这种情况下使用apply_async . 在你的情况下,我会这样做:

    from multiprocessing import Process, Pool, Manager
    
    text = "test"
    def harvester(text, case, q = None):
     X = case[0]
     res = text+ str(X)
     if q:
      q.put(res)
     return res
    
    
    def block_until(q, results_queue, until_counter=0):
     i = 0
     while i < until_counter:
      results_queue.put(q.get())
      i+=1
    
    if __name__ == '__main__':
     pool = multiprocessing.Pool(processes=6)
     case = RAW_DATASET
     m = Manager()
     q = m.Queue()
     results_queue = m.Queue() # when it completes results will reside in this queue
     blocking_process = Process(block_until, (q, results_queue, len(case)))
     blocking_process.start()
     for c in case:
      try:
       res = pool.apply_async(harvester, (text, case, q = None))
       res.get(timeout=0.1)
      except:
       pass
     blocking_process.join()
    
  • 39

    有一个 multiprocessing 的分支叫pathos(注意:使用github上的版本),不需要 starmap - Map 函数镜像python的 Map 的API,因此map可以采用多个参数 . 使用 pathos ,您通常也可以在解释器中执行多处理,而不是卡在 __main__ 块中 . 在经过一些温和的更新后,Pathos即将发布 - 主要是转换为python 3.x.

    Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
      [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
      Type "help", "copyright", "credits" or "license" for more information.
      >>> def func(a,b):
      ...     print a,b
      ...
      >>>
      >>> from pathos.multiprocessing import ProcessingPool    
      >>> pool = ProcessingPool(nodes=4)
      >>> pool.map(func, [1,2,3], [1,1,1])
      1 1
      2 1
      3 1
      [None, None, None]
      >>>
      >>> # also can pickle stuff like lambdas 
      >>> result = pool.map(lambda x: x**2, range(10))
      >>> result
      [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
      >>>
      >>> # also does asynchronous map
      >>> result = pool.amap(pow, [1,2,3], [4,5,6])
      >>> result.get()
      [1, 32, 729]
      >>>
      >>> # or can return a map iterator
      >>> result = pool.imap(pow, [1,2,3], [4,5,6])
      >>> result
      <processing.pool.IMapIterator object at 0x110c2ffd0>
      >>> list(result)
      [1, 32, 729]
    
  • 6
    text = "test"
    
    def unpack(args):
        return args[0](*args[1:])
    
    def harvester(text, case):
        X = case[0]
        text+ str(X)
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=6)
        case = RAW_DATASET
        # args is a list of tuples 
        # with the function to execute as the first item in each tuple
        args = [(harvester, text, c) for c in case]
        # doing it this way, we can pass any function
        # and we don't need to define a wrapper for each different function
        # if we need to use more than one
        pool.map(unpack, args)
        pool.close()
        pool.join()
    
  • 201

    答案取决于版本和情况 . 最近版本的Python(自3.3以来)最常见的答案首先在下面由J.F. Sebastian .1使用Pool.starmap方法,该方法接受一系列参数元组 . 然后它会自动从每个元组解包参数并将它们传递给给定的函数:

    import multiprocessing
    from itertools import product
    
    def merge_names(a, b):
        return '{} & {}'.format(a, b)
    
    if __name__ == '__main__':
        names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
        with multiprocessing.Pool(processes=3) as pool:
            results = pool.starmap(merge_names, product(names, repeat=2))
        print(results)
    
    # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
    

    对于早期版本的Python,您需要编写一个辅助函数来显式解包参数 . 如果你想使用 with ,你还需要编写一个包装器来将 Pool 转换为上下文管理器 . (感谢muon指出这一点 . )

    import multiprocessing
    from itertools import product
    from contextlib import contextmanager
    
    def merge_names(a, b):
        return '{} & {}'.format(a, b)
    
    def merge_names_unpack(args):
        return merge_names(*args)
    
    @contextmanager
    def poolcontext(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
    
    if __name__ == '__main__':
        names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
        with poolcontext(processes=3) as pool:
            results = pool.map(merge_names_unpack, product(names, repeat=2))
        print(results)
    
    # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
    

    在更简单的情况下,使用固定的第二个参数,您也可以使用 partial ,但仅限于Python 2.7 .

    import multiprocessing
    from functools import partial
    from contextlib import contextmanager
    
    @contextmanager
    def poolcontext(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
    
    def merge_names(a, b):
        return '{} & {}'.format(a, b)
    
    if __name__ == '__main__':
        names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
        with poolcontext(processes=3) as pool:
            results = pool.map(partial(merge_names, b='Sons'), names)
        print(results)
    
    # Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
    

    这很大程度上取决于他的答案,而答案本应该被接受 . 但由于这个问题一直停留在最顶层,因此最好为未来的读者改进它 .

  • 0

    对于python2,你可以使用这个技巧

    def fun(a,b):
        return a+b
    
    pool = multiprocessing.Pool(processes=6)
    b=233
    pool.map(lambda x:fun(x,b),range(1000))
    
  • 2

    使用 Python 3.3+pool.starmap():

    from multiprocessing.dummy import Pool as ThreadPool 
    
    def write(i, x):
        print(i, "---", x)
    
    a = ["1","2","3"]
    b = ["4","5","6"] 
    
    pool = ThreadPool(2)
    pool.starmap(write, zip(a,b)) 
    pool.close() 
    pool.join()
    

    结果:

    1 --- 4
    2 --- 5
    3 --- 6
    

    如果您愿意,还可以zip()更多参数: zip(a,b,c,d,e)

    如果您希望将常量值作为参数传递,则必须使用 import itertools 然后使用 zip(itertools.repeat(constant), a) .

  • -1

    是否有一个支持多个参数的pool.map变体?

    Python 3.3包含pool.starmap() method

    #!/usr/bin/env python3
    from functools import partial
    from itertools import repeat
    from multiprocessing import Pool, freeze_support
    
    def func(a, b):
        return a + b
    
    def main():
        a_args = [1,2,3]
        second_arg = 1
        with Pool() as pool:
            L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
            M = pool.starmap(func, zip(a_args, repeat(second_arg)))
            N = pool.map(partial(func, b=second_arg), a_args)
            assert L == M == N
    
    if __name__=="__main__":
        freeze_support()
        main()
    

    对于旧版本:

    #!/usr/bin/env python2
    import itertools
    from multiprocessing import Pool, freeze_support
    
    def func(a, b):
        print a, b
    
    def func_star(a_b):
        """Convert `f([1,2])` to `f(1,2)` call."""
        return func(*a_b)
    
    def main():
        pool = Pool()
        a_args = [1,2,3]
        second_arg = 1
        pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
    
    if __name__=="__main__":
        freeze_support()
        main()
    

    输出

    1 1
    2 1
    3 1
    

    注意这里如何使用itertools.izip()itertools.repeat() .

    由于the bug mentioned by @unutbu,您无法在Python 2.6上使用functools.partial()或类似功能,因此应明确定义简单的包装函数 func_star() . 另见the workaround suggested by uptimebox .

  • 3

    python2的更好解决方案:

    from multiprocessing import Pool
    def func((i, (a, b))):
        print i, a, b
        return a + b
    pool = Pool(3)
    pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
    

    2 3 4

    1 2 3

    0 1 2

    出[]:

    [3,5,7]

  • 112

    另一个简单的替代方法是将函数参数包装在元组中,然后包装应该在元组中传递的参数 . 在处理大量数据时,这可能并不理想 . 我相信它会为每个元组制作副本 .

    from multiprocessing import Pool
    
    def f((a,b,c,d)):
        print a,b,c,d
        return a + b + c +d
    
    if __name__ == '__main__':
        p = Pool(10)
        data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
        print(p.map(f, data))
        p.close()
        p.join()
    

    以随机顺序给出输出:

    0 1 2 3
    1 2 3 4
    2 3 4 5
    3 4 5 6
    4 5 6 7
    5 6 7 8
    7 8 9 10
    6 7 8 9
    8 9 10 11
    9 10 11 12
    [6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
    
  • 2

    从python 3.4.4开始,您可以使用multiprocessing.get_context()来获取上下文对象以使用多个start方法:

    import multiprocessing as mp
    
    def foo(q, h, w):
        q.put(h + ' ' + w)
        print(h + ' ' + w)
    
    if __name__ == '__main__':
        ctx = mp.get_context('spawn')
        q = ctx.Queue()
        p = ctx.Process(target=foo, args=(q,'hello', 'world'))
        p.start()
        print(q.get())
        p.join()
    

    或者你只是简单地替换

    pool.map(harvester(text,case),case, 1)
    

    通过:

    pool.apply_async(harvester(text,case),case, 1)
    

相关问题