首页 文章

多处理中的共享内存对象

提问于
浏览
84

假设我有一个大内存numpy数组,我有一个函数 func ,它接受这个巨大的数组作为输入(连同一些其他参数) . 具有不同参数的 func 可以并行运行 . 例如:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中 .

有没有办法让不同的进程共享同一个数组?此数组对象是只读的,永远不会被修改 .

更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法分享它?

[EDITED]

我读了答案,但我仍然有点困惑 . 由于fork()是copy-on-write,因此在python多处理库中生成新进程时不应调用任何额外的成本 . 但是下面的代码表明存在巨大的开销:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

输出(顺便说一下,随着数组大小的增加,成本也会增加,所以我怀疑仍有与内存复制相关的开销):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

如果我们不复制数组,为什么会有这么大的开销?共享内存拯救了我的哪一部分?

2 回答

  • 86

    如果您使用的是使用copy-on-write fork() 语义的操作系统(如任何常见的unix),那么只要您永远不会改变您的数据结构,它就可供所有子进程使用而不占用额外的内存 . 你不必做任何特别的事情(除了绝对确保你不改变对象) .

    The most efficient thing you can do for your problem 将您的数组打包成一个有效的数组结构(使用 numpyarray),将其放在共享内存中,用 multiprocessing.Array 包装,并将其传递给您的函数 . This answer shows how to do that .

    如果你想要一个可写的共享对象,那么你需要用某种同步或锁定来包装它 . multiprocessing 提供two methods of doing this:一个使用共享内存(适用于简单值,数组或ctypes)或 Manager 代理,其中一个进程保存内存,管理器仲裁从其他进程(甚至通过网络)对它的访问 .

    Manager 方法可以与任意Python对象一起使用,但是比使用共享内存的等效方法慢,因为对象需要序列化/反序列化并在进程之间发送 .

    有一个wealth of parallel processing libraries and approaches available in Python . multiprocessing 是一个优秀且全面的图书馆,但如果您有特殊需求,或许其他方法可能会更好 .

  • 13

    我遇到了同样的问题,写了一个小的共享内存实用程序类来解决它 .

    我正在使用multiprocessing.RawArray(lockfree),并且对数组的访问根本不同步(lockfree),小心不要自己动手 .

    通过该解决方案,我在四核i7上获得了大约3倍的加速 .

    这是代码:随意使用和改进它,请报告任何错误 .

    '''
    Created on 14.05.2013
    
    @author: martin
    '''
    
    import multiprocessing
    import ctypes
    import numpy as np
    
    class SharedNumpyMemManagerError(Exception):
        pass
    
    '''
    Singleton Pattern
    '''
    class SharedNumpyMemManager:    
    
        _initSize = 1024
    
        _instance = None
    
        def __new__(cls, *args, **kwargs):
            if not cls._instance:
                cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                    cls, *args, **kwargs)
            return cls._instance        
    
        def __init__(self):
            self.lock = multiprocessing.Lock()
            self.cur = 0
            self.cnt = 0
            self.shared_arrays = [None] * SharedNumpyMemManager._initSize
    
        def __createArray(self, dimensions, ctype=ctypes.c_double):
    
            self.lock.acquire()
    
            # double size if necessary
            if (self.cnt >= len(self.shared_arrays)):
                self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)
    
            # next handle
            self.__getNextFreeHdl()        
    
            # create array in shared memory segment
            shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))
    
            # convert to numpy array vie ctypeslib
            self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)
    
            # do a reshape for correct dimensions            
            # Returns a masked array containing the same data, but with a new shape.
            # The result is a view on the original array
            self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)
    
            # update cnt
            self.cnt += 1
    
            self.lock.release()
    
            # return handle to the shared memory numpy array
            return self.cur
    
        def __getNextFreeHdl(self):
            orgCur = self.cur
            while self.shared_arrays[self.cur] is not None:
                self.cur = (self.cur + 1) % len(self.shared_arrays)
                if orgCur == self.cur:
                    raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')
    
        def __freeArray(self, hdl):
            self.lock.acquire()
            # set reference to None
            if self.shared_arrays[hdl] is not None: # consider multiple calls to free
                self.shared_arrays[hdl] = None
                self.cnt -= 1
            self.lock.release()
    
        def __getArray(self, i):
            return self.shared_arrays[i]
    
        @staticmethod
        def getInstance():
            if not SharedNumpyMemManager._instance:
                SharedNumpyMemManager._instance = SharedNumpyMemManager()
            return SharedNumpyMemManager._instance
    
        @staticmethod
        def createArray(*args, **kwargs):
            return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)
    
        @staticmethod
        def getArray(*args, **kwargs):
            return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)
    
        @staticmethod    
        def freeArray(*args, **kwargs):
            return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)
    
    # Init Singleton on module load
    SharedNumpyMemManager.getInstance()
    
    if __name__ == '__main__':
    
        import timeit
    
        N_PROC = 8
        INNER_LOOP = 10000
        N = 1000
    
        def propagate(t):
            i, shm_hdl, evidence = t
            a = SharedNumpyMemManager.getArray(shm_hdl)
            for j in range(INNER_LOOP):
                a[i] = i
    
        class Parallel_Dummy_PF:
    
            def __init__(self, N):
                self.N = N
                self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
                self.pool = multiprocessing.Pool(processes=N_PROC)
    
            def update_par(self, evidence):
                self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))
    
            def update_seq(self, evidence):
                for i in range(self.N):
                    propagate((i, self.arrayHdl, evidence))
    
            def getArray(self):
                return SharedNumpyMemManager.getArray(self.arrayHdl)
    
        def parallelExec():
            pf = Parallel_Dummy_PF(N)
            print(pf.getArray())
            pf.update_par(5)
            print(pf.getArray())
    
        def sequentialExec():
            pf = Parallel_Dummy_PF(N)
            print(pf.getArray())
            pf.update_seq(5)
            print(pf.getArray())
    
        t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
        t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")
    
        print("Sequential: ", t1.timeit(number=1))    
        print("Parallel: ", t2.timeit(number=1))
    

相关问题