首页 文章

Python3在父/子进程之间共享数组

提问于
浏览
0

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Array

What I’m trying to do 在MainProcess中创建一个数组,并通过继承将其发送到任何后续子进程 . 子进程将更改数组 . 父进程将查找更改并采取相应措施 .

The problem 父进程没有"see"子进程所做的任何更改 . 但是,子进程会进行更改"see" . 即如果孩子1添加了一个项目,那么孩子2将会看到该项目等

对于sARRAY和iARRAY以及iVALUE都是如此 .

BUT 虽然父进程似乎无视数组值,但它确实注意到对iVALUE所做的更改 .

我不明白我做错了什么 .

更新2 https://stackoverflow.com/a/6455704/1267259混淆的主要原因是多处理使用单独的进程而不是线程 . 这意味着父级不会自动看到子级对对象状态所做的任何更改 . 澄清 . 我想做的是可能的,对吧? https://stackoverflow.com/a/26554759/1267259我的意思是多处理数组和值的目的是在子进程和父进程之间进行通信?而iVALUE也是如此......

我发现了这个Shared Array not shared correctly in python multiprocessing

但我不明白答案“分配给所有过程都有意义的 Value 似乎有所帮助:”

UPDATE 1找到Python:多处理和c_char_p数组>“arr [i]指向arr [i]指向一个只对进行赋值的子进程有意义的内存地址 . 其他子进程在查看该地址时检索垃圾“ . 据我了解,这不适用于这个问题 . 在这种情况下,一个子进程对数组的赋值对其他子进程有意义 . 但为什么主流程没有意义呢?

我知道“管理员”,但感觉像Array应该足以满足这个用例 . 我已阅读手册,但显然我似乎没有得到它 .

更新3确实,这工作管理= multiprocessing.Manager()
manage = list(range(3))
所以...

What am I doing wrong?

import multiprocessing
import ctypes

class MainProcess:

    # keep track of process
    iVALUE = multiprocessing.Value('i',-1) # this works
    # keep track of items
    sARRAY = multiprocessing.Array(ctypes.c_wchar_p, 1024) # this works between child processes
    iARRAY = multiprocessing.Array(ctypes.c_int, 3) # this works between child processes

    pLOCK = multiprocessing.Lock()

def __init__(self):
    # create an index for each process
    self.sARRAY.value = [None] * 3
    self.iARRAY.value = [None] * 3

def InitProcess(self):
    # list of items to process
    items = []
    item = (i for i in items)
    with(multiprocessing.Pool(3)) as pool:
        # main loop: keep looking for updated values
        while True:
            try:
                pool.apply_async(self.worker, (next(item),callback=eat_finished_cake))
            except StopIteration:
                pass

            print(self.sARRAY) # yields [None][None][None]
            print(self.iARRAY) # yields [None][None][None]
            print(self.iVALUE) # yields 1-3

    pool.close()
    pool.join()

def worker(self,item):

    with self.pLOCK:
        self.iVALUE.value += 1

    self.sARRAY.value[self.iVALUE.value] = item # value: 'item 1'
    self.iARRAY.value[self.iVALUE.value] = 2
    # on next child process run
    print(self.iVALUE.value) # prints 1
    print(self.sARRAY.value) # prints ['item 1'][None][None]
    print(self.iARRAY.value) # prints [2][None][None]

    sleep(0.5)

    ...
    with self.pLOCK:
        self.iVALUE.value -= 1

UPDATE 4 改变

pool.apply_async(self.worker, (next(item),))

x = pool.apply_async(self.worker, (next(item),))
print(x.get())

要么

x = pool.apply(self.worker, (next(item),))
print(x)

在自我中 . worker()返回self.iARRAY.value或self.sARRAY.value会返回一个具有更新值的变量 . 这不是我想要实现的,这不是事件需要使用ARRAY来实现...

所以我需要澄清一下 . 在self.worker()中,我正在做重要的繁重工作,这可能需要很长时间,我需要将信息发送回主进程,例如在返回值完成之前的进度被发送到回调 .

我不希望将完成的工作结果返回到main方法/由回调函数处理 . 我现在看到,在代码示例中省略回调可能会给人留下不同的印象 .

UPDATE 5 回复:Use numpy array in shared memory for multiprocessing

我已经看到了这个答案,并尝试使用带有全局变量的initilaizer()并通过initargs传递数组而没有运气 . 我不理解若虫的使用和“closing()”,但该代码似乎不访问main()内的“arr”,尽管使用了shared_arr,但仅在p.join()之后 .

据我所知,数组被声明然后转为若虫并通过init(x)继承 . 到目前为止,我的代码应该具有与该代码相同的行为 .

一个主要的区别似乎是如何访问数组

当我尝试时,我只使用属性值成功设置并获取数组值

self.iARRAY[0] = 1 # instead of iARRAY.value = [None] * 3
self.iARRAY[1] = 1
self.iARRAY[2] = 1

print(self.iARRAY) # prints <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_int_Array_3 object at 0x7f9cfa8538c8>>

我找不到访问和检查值的方法(属性“值”给出一个未知的方法错误)

与该代码的另一个主要区别是使用get_obj()防止数据复制 .

这不是一个nymphy问题吗?

assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

不知道如何利用它 .

def worker(self,item):

    with self.pLOCK:
        self.iVALUE.value += 1

    self.sARRAY.value[self.iVALUE.value] = item # value: 'item 1'

    with self.iARRAY.get_lock():
        arr = self.iARRAY.get_obj()
        arr[self.iVALUE.value] = 2   # and now ??? 

    sleep(0.5)

    ...
    with self.pLOCK:
        self.iVALUE.value -= 1

UPDATE 6 我尝试过使用multiprocessing.Process()而不是Pool()但结果是一样的 .

2 回答

  • 1

    声明全局变量的正确方法(在本例中为class属性)

    iARRAY = multiprocessing.Array(ctypes.c_int, range(3))
    

    正确的设定 Value 的方法

    self.iARRAY[n] = x
    

    获得 Value 的正确方法

    self.iARRAY[n]
    

    不知道为什么我见过的例子使用了Array(ctypes.c_int,3)和iARRAY.value [n]但是在这种情况下错了

  • 0

    这是你的问题:

    while True:
        try:
            pool.apply_async(self.worker, (next(item),))
        except StopIteration:
            pass
    
        print(self.sARRAY) # yields [None][None][None]
        print(self.iARRAY) # yields [None][None][None]
        print(self.iVALUE) # yields 1-3
    

    函数 pool.apply_async() 启动子进程并立即返回 . 你似乎没有等待 Worker 完成 . 为此,您可以考虑使用barrier .

相关问题