首页 文章

了解多处理:Python中的共享内存管理,锁和队列

提问于
浏览
36

Multiprocessing是python中一个强大的工具,我想更深入地理解它 . 我想知道何时使用常规LocksQueues以及何时使用多处理Manager在所有进程中共享这些 .

我提出了以下测试方案,其中包含四种不同的多处理条件:

  • 使用池和 NO 经理

  • 使用池和管理器

  • 使用单个进程和 NO Manager

  • 使用单个进程和Manager

工作

所有条件都执行作业功能 the_job . the_job 由一些由锁固定的打印组成 . 此外,函数的输入只是放入队列(以查看它是否可以从队列中恢复) . 此输入只是在主脚本中创建的 range(10) 索引 idx ,名为 start_scenario (显示在底部) .

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)

条件的成功定义为完全从队列中调用输入,请参见底部的函数 read_queue .

条件

条件1和2是相当不言自明的 . 条件1涉及创建锁和队列,并将它们传递给进程池:

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.imap(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)

(帮助函数 make_iterator 在本文的底部给出 . )条件1失败, RuntimeError: Lock objects should only be shared between processes through inheritance .

条件2非常相似,但现在锁和队列在经理的监督下:

def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.imap(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)

在条件3中,手动启动新进程,并在没有管理器的情况下创建锁和队列:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

条件4类似,但现在又使用经理:

def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

在这两个条件中 - 3和4 - 我为 the_job 的10个任务中的每个任务启动一个新进程,其中至多ncores进程同时运行 . 这是通过以下辅助函数实现的:

def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)

结果

只有条件1失败( RuntimeError: Lock objects should only be shared between processes through inheritance ),而其他3个条件成功 . 我试图围绕这个结果 .

为什么池需要在所有进程之间共享锁和队列,但条件3中的各个进程不需要?

我所知道的是,对于池条件(1和2),来自迭代器的所有数据都通过酸洗传递,而在单个进程条件(3和4)中,来自迭代器的所有数据都是通过主进程的继承传递的(我是使用 Linux ) . 我想直到从子进程内部更改内存,才会访问父进程使用的相同内存(写时复制) . 但是只要一个人说 lock.acquire() ,就应该改变它,并且子进程确实使用放在内存中其他位置的不同锁,不是吗?一个子进程如何知道兄弟已经激活了一个不通过管理员共享的锁?

最后,有点相关的是我的问题,有多少不同的条件3和4 . 两者都有单独的流程,但它们在经理的使用上有所不同 . 两者都被认为是有效的代码吗?或者,如果实际上不需要经理,应该避免使用经理吗?


完整脚本

对于那些只想复制和粘贴所有内容来执行代码的人来说,这里是完整的脚本:

__author__ = 'Me and myself'

import multiprocessing as mp
import time

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)


def read_queue(queue):
    """Turns a qeue into a normal python list."""
    results = []
    while not queue.empty():
        result = queue.get()
        results.append(result)
    return results


def make_iterator(args, lock, queue):
    """Makes an iterator over args and passes the lock an queue to each element."""
    return ((arg, lock, queue) for arg in args)


def start_scenario(scenario_number = 1):
    """Starts one of four multiprocessing scenarios.

    :param scenario_number: Index of scenario, 1 to 4

    """
    args = range(10)
    ncores = 3
    if scenario_number==1:
        result =  scenario_1_pool_no_manager(the_job, args, ncores)

    elif scenario_number==2:
        result =  scenario_2_pool_manager(the_job, args, ncores)

    elif scenario_number==3:
        result =  scenario_3_single_processes_no_manager(the_job, args, ncores)

    elif scenario_number==4:
        result =  scenario_4_single_processes_manager(the_job, args, ncores)

    if result != args:
        print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
    else:
        print 'Scenario %d successful!' % scenario_number


def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.map(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.map(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)


def main():
    """Runs 1 out of 4 different multiprocessing scenarios"""
    start_scenario(1)


if __name__ == '__main__':
    main()

1 回答

  • 28

    multiprocessing.Lock 是使用OS提供的Semaphore对象实现的 . 在Linux上,子进程通过 os.fork 从父进程继承了Semaphore的句柄 . 实际上,这并不是继承父项所具有的相同句柄,这与继承文件描述符的方式相同 . 另一方面,Windows不支持 os.fork ,所以它必须腌制 Lock . 它通过使用Windows DuplicateHandle API创建 multiprocessing.Lock 对象内部使用的Windows信号量的重复句柄来实现此目的,该API声明:

    重复句柄指的是与原始句柄相同的对象 . 因此,对象的任何更改都通过两个句柄反映出来

    DuplicateHandle API允许您将重复句柄的所有权赋予子进程,以便子进程在取消对其进行实际使用后可以使用它 . 通过创建子项拥有的重复句柄,可以有效地锁定对象 .

    这是 multiprocessing/synchronize.py 中的信号量对象

    class SemLock(object):
    
        def __init__(self, kind, value, maxvalue):
            sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
            debug('created semlock with handle %s' % sl.handle)
            self._make_methods()
    
            if sys.platform != 'win32':
                def _after_fork(obj):
                    obj._semlock._after_fork()
                register_after_fork(self, _after_fork)
    
        def _make_methods(self):
            self.acquire = self._semlock.acquire
            self.release = self._semlock.release
            self.__enter__ = self._semlock.__enter__
            self.__exit__ = self._semlock.__exit__
    
        def __getstate__(self):  # This is called when you try to pickle the `Lock`.
            assert_spawning(self)
            sl = self._semlock
            return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
    
        def __setstate__(self, state): # This is called when unpickling a `Lock`
            self._semlock = _multiprocessing.SemLock._rebuild(*state)
            debug('recreated blocker with handle %r' % state[0])
            self._make_methods()
    

    注意 __getstate__ 中的 assert_spawning 调用,在调用对象时调用它 . 这是如何实现的:

    #
    # Check that the current thread is spawning a child process
    #
    
    def assert_spawning(self):
        if not Popen.thread_is_spawning():
            raise RuntimeError(
                '%s objects should only be shared between processes'
                ' through inheritance' % type(self).__name__
                )
    

    该函数是通过调用 thread_is_spawning 确保你是 Lock 的函数 . 在Linux上,该方法只返回 False

    @staticmethod
    def thread_is_spawning():
        return False
    

    这是因为Linux不需要pickle来继承 Lock ,所以如果在Linux上实际调用 __getstate__ ,我们就不能继承 . 在Windows上,还有更多内容:

    def dump(obj, file, protocol=None):
        ForkingPickler(file, protocol).dump(obj)
    
    class Popen(object):
        '''
        Start a subprocess to run the code of a process object
        '''
        _tls = thread._local()
    
        def __init__(self, process_obj):
            ...
            # send information to child
            prep_data = get_preparation_data(process_obj._name)
            to_child = os.fdopen(wfd, 'wb')
            Popen._tls.process_handle = int(hp)
            try:
                dump(prep_data, to_child, HIGHEST_PROTOCOL)
                dump(process_obj, to_child, HIGHEST_PROTOCOL)
            finally:
                del Popen._tls.process_handle
                to_child.close()
    
    
        @staticmethod
        def thread_is_spawning():
            return getattr(Popen._tls, 'process_handle', None) is not None
    

    这里,如果 Popen._tls 对象具有 process_handle 属性, thread_is_spawning 将返回 True . 我们可以看到 process_handle 属性在 __init__ 中创建,然后我们想要继承的数据使用 dump 从父级传递给子级,然后删除该属性 . 所以 thread_is_spawning__init__ 期间仅为 True . 根据this python-ideas mailing list thread,这实际上是一个人为限制,用于模拟与Linux上 os.fork 相同的行为 . Windows实际上可以支持随时传递 Lock ,因为 DuplicateHandle 可以随时运行 .

    以上所有内容都适用于 Queue 对象,因为它在内部使用 Lock .

    我会说继承 Lock 对象比使用 Manager.Lock() 更好,因为当你使用 Manager.Lock 时,你对 Lock 的每一次调用都必须通过IPC发送到 Manager 进程,这比使用共享要慢得多 . Lock 住在调用过程中 . 但是,这两种方法都是完全有效的 .

    最后,可以使用 initializer / initargs 关键字参数将 Lock 传递给 Pool 的所有成员而不使用 Manager

    lock = None
    def initialize_lock(l):
       global lock
       lock = l
    
    def scenario_1_pool_no_manager(jobfunc, args, ncores):
        """Runs a pool of processes WITHOUT a Manager for the lock and queue.
    
        """
        lock = mp.Lock()
        mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
        queue = mp.Queue()
    
        iterator = make_iterator(args, queue)
    
        mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.
    
        mypool.close()
        mypool.join()
    
    return read_queue(queue)
    

    这是有效的,因为传递给 initargs 的参数传递给在 Pool 内运行的 Process 对象的 __init__ 方法,因此它们最终被继承而不是被腌制 .

相关问题