首页 文章

在Python多处理中同步写入共享内存(列表)

提问于
浏览
1

我有以下代码:

import multiprocessing
manager = multiprocessing.Manager()

如果长度小于4则附加列表的函数或创建初始值为“y”的新函数 .

def f(my_array):
    if len(my_array) < 4:
        my_array.append('x')
    else:
        my_array = ['y']
    print(my_array)

初始化列表和创建流程 .

if __name__ == '__main__':
    my_array = manager.list(['a', 'b', 'c'])

    p1 = Process(target=f, args=(my_array))
    p2 = Process(target=f, args=(my_array))
    p3 = Process(target=f, args=(my_array))
    p4 = Process(target=f, args=(my_array))
    p5 = Process(target=f, args=(my_array))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p5.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    p5.join()

输出我得到:

['a', 'b', 'c', 'x']
['y']
['y']
['y'] 
['y']

我不明白为什么列表只附加一次 . 我预计在第三个输出行中我将观察由'x'附加的列表['y'],所以['y','x'],第四个['y','x','x']等等 . 似乎共享内存泄漏或不允许通过多个进程的函数进行更改 . 我该怎么做才能启用目标行为?

1 回答

  • 1

    同步是示例中缺少的一点 . manager.list 只是一个单独的服务器进程中的正常 list ,您的工作进程可以通过代理对象进行修改 . 您的进一步流程恰好同时检查 len(my_array) .

    没有同步,告诉他们他们应该等到另一个进程完成它的操作,包括执行此长度检查并执行依赖于此检查结果的操作 . 您的更新操作不是原子操作,您需要通过在您的操作周围使用manager.lock来使其成为一个操作 .

    您的代码中存在另一个问题,即重新绑定 my_array 以指向正常列表 ['y'] ,而不是更新/修改共享 manager.list . 您没有使用设置 my_array = ['y'] 的进程修改 manager.listmanager.list 保持从第一次修改到第一个工作进程直到程序结束的值 ['a', 'b', 'c', 'x'] .

    from multiprocessing import Process, Manager
    
    
    def f(my_array, lock):
        with lock:
            if len(my_array) < 4:
                my_array.append('x')
            else:
                my_array[:] = []  # clear list inplace by assigning
                # empty list to slice of manager.list
                my_array.append('y')
        print(my_array)
    
    
    if __name__ == '__main__':
    
        N_WORKERS = 5
    
        with Manager() as manager:
    
            my_array = manager.list(['a', 'b', 'c'])
            lock = manager.Lock()
    
            pool = [
                Process(target=f, args=(my_array, lock)) for _ in range(N_WORKERS)
            ]
    
            for p in pool:
                p.start()
            for p in pool:
                p.join()
    
            # Leaving the context-manager block will shut down the manager-process.
            # We need to convert the manager-list to a normal list in the parent
            # to keep its values available for further processing in the parent.
            result = list(my_array)
    
        print(f'result: {result}')
    

    示例输出:

    ['a', 'b', 'c', 'x']
    ['y']
    ['y', 'x']
    ['y', 'x', 'x']
    ['y', 'x', 'x', 'x']
    result: ['y', 'x', 'x', 'x']
    
    Process finished with exit code 0
    

相关问题