我正在尝试使用Python 2.7中的Queue.Queue实现多线程 生产环境 者 - 消费者模式 . 我试图找出如何使消费者,即 Worker 线程,一旦完成所有必要的工作,停止 .
请参阅Martin James对此答案的第二条评论:https://stackoverflow.com/a/19369877/1175080
发送'我完成'任务,指示池线程终止 . 任何获得此类任务的线程都会重新排队,然后自杀 .
但这对我不起作用 . 例如,请参阅以下代码 .
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
在所有三个工作人员都已从队列中读取退出指示符(即 -1
)之后,此程序挂起,因为每个工作程序在退出之前重新排队 -1
,因此队列永远不会变为空并且 q.join()
永远不会返回 .
我提出了以下但丑陋的解决方案,我通过队列为每个 Worker 发送一个 -1
退出指示器,这样每个 Worker 都可以看到它并自杀 . 但事实上,我必须为每个 Worker 发送一个退出指示器感觉有点难看 .
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send one stop indicator for each worker.
for i in range(3):
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
我有两个问题 .
-
为所有线程发送单个退出指示器的方法(如Martin James的第二个评论中所解释的)甚至可以工作吗?
-
如果上一个问题的答案是"No",有没有办法解决问题,我不必为每个工作线程发送单独的退出指示器?
4 回答
为了完整起见:你还可以将一个停止信号排入 - (线程计数) . 然后每个线程可以将其递增1并仅在停止信号为!= 0时重新排队 .
但我个人会用
Travis Mehlinger
或Daniel Sanchez
回答 .不要将其称为任务的特殊情况 .
使用Event代替,为您的员工提供非阻塞实施 .
Can the method of sending a single exit indicator for all threads (as explained in the second comment of https://stackoverflow.com/a/19369877/1175080 by Martin James) even work?
正如您所注意到的那样,它无法正常工作,传播消息将使最后一个线程用另外一个项更新队列,因为您正在等待一个永远不会为空的队列,而不是您拥有的代码 .
If the answer to the previous question is "No", is there a way to solve the problem in a way that I don't have to send a separate exit indicator for each worker thread?
你可以
join
线程而不是队列:因为Queue documentation解释
get
方法会在其空的时候引发一个execption,所以如果你已经知道要处理的数据,你可以填充队列然后垃圾邮件线程:在这里你有一个live example
要点是,
你创建一个
latch
,它将 open only after a certain counter went down ,当闩锁打开时,等待它的线程将被允许继续执行 .
我做了一个非常简单的例子,检查here类似于这样一个锁存器的例子: