首页 文章

Python:在衍生进程之间共享锁定

提问于
浏览
2

最终目标是在后台执行一个方法,但不是并行执行:当多个对象调用此方法时,每个对象都应该等待轮到他们继续 . 要在后台运行,我必须在子进程(而不是线程)中运行该方法,我需要使用spawn(而不是fork)来启动它 . 为了防止并行执行,显而易见的解决方案是在进程之间共享全局锁 .
当进程被分叉时,这是Unix上的默认进程,很容易实现,正如以下两个代码中所强调的那样 .
我们可以将它作为类变量共享:

import multiprocessing as mp
from time import sleep

class OneAtATime:

    l = mp.Lock()

    def f(self):
        with self.l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    a = OneAtATime()
    b = OneAtATime()
    p1 = mp.Process(target = a.f)
    p2 = mp.Process(target = b.f)
    p1.start()
    p2.start()

或者我们可以将它传递给方法:

import multiprocessing as mp
from time import sleep

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = mp.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()

这两个代码都具有以一秒间隔打印"hello"的适当行为 . 但是,当将start method更改为'spawn'时,它们会被破坏 .
第一个(1)同时打印"hello" . 这是因为the internal state of a class is not pickled,所以他们没有相同的锁 .
第二个(2)在运行时因FileNotFoundError而失败 . 我认为这与锁不能被腌制这一事实有关:见Python sharing a lock between processes .
在这个答案中,建议两个修复(旁注:我不能使用池,因为我想随机创建任意数量的进程) .
我还没有找到适应第二个修复的方法,但我尝试实现第一个修复:

import multiprocessing as mp
from time import sleep

if __name__ == "__main__":
    mp.set_start_method('spawn')

class OneAtATime:
    def f(self, l):
        with l:
            sleep(1)
        print("Hello")

if __name__ == "__main__":
    a = OneAtATime()
    b = OneAtATime()
    m = mp.Manager()
    l = m.Lock()
    p1 = mp.Process(target = a.f, args = (l,))
    p2 = mp.Process(target = b.f, args = (l,))
    p1.start()
    p2.start()

这与AttributeError和FileNotFoundError(3)失败 . 事实上,当使用fork方法时它也会失败(BrokenPipe)(4) .
What is the proper way of sharing a lock between spawned processes ?
快速解释这四个失败我编号也会很好 . 我在Archlinux下运行Python 3.6 .

2 回答

  • 1

    恭喜你,你有90%的方式在那里 . 最后一步实际上并不是很难做到 .

    是的,您的最终代码块因AttributeError而失败,但具体是什么错误? “无法获取'OneAtATime'属性” . 这与您已经遇到的问题非常相似 - 它并没有腌制OneAtATime类 .

    我做了以下更改,它按照您的意愿工作:

    file ooat.py:

    from time import sleep
    
    class OneAtATime:
        def f(self, l):
            with l:
                sleep(1)
            print("Hello")
    

    交互式shell:

    import multiprocessing as mp
    from oaat import OneAtATime
    if __name__ == "__main__":
        mp.set_start_method('spawn')
        a = OneAtATime()
        b = OneAtATime()
        m = mp.Manager()
        l = m.Lock()
        p1 = mp.Process(target = a.f, args = (l,))
        p2 = mp.Process(target = b.f, args = (l,))
        p1.start()
        p2.start()
    

    您可能会注意到,我并没有真正做任何事情 - 只需将您的代码拆分为两个单独的文件即可 . 尝试一下,你会发现它工作正常 . (至少,它对我来说,在ubuntu上使用python 3.5 . )

  • 1

    如果脚本没有过早退出,则最后一个代码段可以正常工作 . 加入流程就足够了:

    import multiprocessing as mp
    from time import sleep
    
    class OneAtATime:
        def f(self, l):
            with l:
                sleep(1)
            print("Hello")
    
    if __name__ == "__main__":
        mp.set_start_method('spawn')
        a = OneAtATime()
        b = OneAtATime()
        m = mp.Manager()
        l = m.Lock()
        p1 = mp.Process(target = a.f, args = (l,))
        p2 = mp.Process(target = b.f, args = (l,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    

    有关它导致的错误的更多信息https://stackoverflow.com/a/25456494/8194503 .

相关问题