首页 文章

在python多 生产环境 者和多用户线程中,queue.join()可能不可靠吗?

提问于
浏览
1

一个python多 生产环境 者和多消费者线程伪代码:

def threadProducer():
    while upstreams_not_done:
        data = do_some_work()
        queue_of_data.put(data)

def threadConsumer():
    while True:
        data = queue_of_data.get()
        do_other_work()
        queue_of_data.task_done()

queue_of_data = queue.Queue()

list_of_producers = create_and_start_producers()
list_of_consumers = create_and_start_consumers()

queue_of_data.join()
# is now all work done?

其中为队列中的每个项目调用 queue_of_data.task_done() .

producers work slower then consumers 时,是否有可能 queue_of_data.join() non-blocks at some moment when no producer generates data yet, but all consumers finish their taskstask_done()

如果 Queue.join() 不是这样可靠的,我该如何检查所有工作是否完成?

1 回答

  • 2

    通常的方法是在生成器完成时在队列中放置一个sentinel值(如 None ),每个消费者线程一个 . 然后编写消费者在从队列中拉出 None 时退出该线程 .

    所以,例如,在主程序中:

    for t in list_of_producers:
        t.join()
    # Now we know all producers are done.
    for t in list_of_consumers:
        queue_of_data.put(None)  # tell a consumer we're done
    for t in list_of_consumers:
        t.join()
    

    和消费者看起来像:

    def threadConsumer():
        while True:
            data = queue_of_data.get()
            if data is None:
                break
            do_other_work()
    

    注意:如果 生产环境 者可以压倒消费者,请创建具有最大大小的队列 . 然后 queue.put() 将在队列达到该大小时阻塞,直到消费者从队列中删除某些内容 .

相关问题