首页 文章

如何在多线程 生产环境 者 - 消费者模式中完成工作后退出工作线程?

提问于
浏览
14

我正在尝试使用Python 2.7中的Queue.Queue实现多线程 生产环境 者 - 消费者模式 . 我试图找出如何使消费者,即 Worker 线程,一旦完成所有必要的工作,停止 .

请参阅Martin James对此答案的第二条评论:https://stackoverflow.com/a/19369877/1175080

发送'我完成'任务,指示池线程终止 . 任何获得此类任务的线程都会重新排队,然后自杀 .

但这对我不起作用 . 例如,请参阅以下代码 .

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

在所有三个工作人员都已从队列中读取退出指示符(即 -1 )之后,此程序挂起,因为每个工作程序在退出之前重新排队 -1 ,因此队列永远不会变为空并且 q.join() 永远不会返回 .

我提出了以下但丑陋的解决方案,我通过队列为每个 Worker 发送一个 -1 退出指示器,这样每个 Worker 都可以看到它并自杀 . 但事实上,我必须为每个 Worker 发送一个退出指示器感觉有点难看 .

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

我有两个问题 .

  • 为所有线程发送单个退出指示器的方法(如Martin James的第二个评论中所解释的)甚至可以工作吗?

  • 如果上一个问题的答案是"No",有没有办法解决问题,我不必为每个工作线程发送单独的退出指示器?

4 回答

  • 3

    为了完整起见:你还可以将一个停止信号排入 - (线程计数) . 然后每个线程可以将其递增1并仅在停止信号为!= 0时重新排队 .

    if data < 0: # negative numbers are used to indicate that the worker should stop
            if data < -1:
                q.put(data + 1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break
    

    但我个人会用 Travis MehlingerDaniel Sanchez 回答 .

  • 11

    不要将其称为任务的特殊情况 .

    使用Event代替,为您的员工提供非阻塞实施 .

    stopping = threading.Event()
    
    def worker(n, q, timeout=1):
        # run until the master thread indicates we're done
        while not stopping.is_set():
            try:
                # don't block indefinitely so we can return to the top
                # of the loop and check the stopping event
                data = q.get(True, timeout)
            # raised by q.get if we reach the timeout on an empty queue
            except queue.Empty:
                continue
            q.task_done()
    
    def master():
        ...
    
        print 'waiting for workers to finish'
        q.join()
        stopping.set()
        print 'done'
    
  • 16

    Can the method of sending a single exit indicator for all threads (as explained in the second comment of https://stackoverflow.com/a/19369877/1175080 by Martin James) even work?

    正如您所注意到的那样,它无法正常工作,传播消息将使最后一个线程用另外一个项更新队列,因为您正在等待一个永远不会为空的队列,而不是您拥有的代码 .

    If the answer to the previous question is "No", is there a way to solve the problem in a way that I don't have to send a separate exit indicator for each worker thread?

    你可以 join 线程而不是队列:

    def worker(n, q):
        # n - Worker ID
        # q - Queue from which to receive data
        while True:
            data = q.get()
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
            if data == -1: # -1 is used to indicate that the worker should stop
                # Requeue the exit indicator.
                q.put(-1)
                # Commit suicide.
                print 'worker', n, 'is exiting'
                break
    
    def master():
        # master() sends data to worker() via q.
        q = Queue.Queue()
    
        # Create 3 workers.
        threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)]
        for t in threads:
            threads.start()
        # Send 10 items to work on.
        for i in range(10):
            q.put(i)
            time.sleep(0.5)
    
        # Send an exit indicator for all threads to consume.
        q.put(-1)
    
        print 'waiting for workers to finish ...'
        for t in threads:
            t.join()
        print 'done'
    
    master()
    

    因为Queue documentation解释 get 方法会在其空的时候引发一个execption,所以如果你已经知道要处理的数据,你可以填充队列然后垃圾邮件线程:

    import Queue
    import threading
    import time
    
    def worker(n, q):
        # n - Worker ID
        # q - Queue from which to receive data
        while True:
            try:
                data = q.get(block=False, timeout=1)
                print 'worker', n, 'got', data
                time.sleep(1)  # Simulate noticeable data processing time
                q.task_done()
            except Queue.Empty:
                break
    
    
    def master():
        # master() sends data to worker() via q.
        q = Queue.Queue()
    
        # Send 10 items to work on.
        for i in range(10):
            q.put(i)
    
        # Create 3 workers.
        for i in range(3):
            t = threading.Thread(target=worker, args=(i, q))
            t.start()
    
        print 'waiting for workers to finish ...'
        q.join()
        print 'done'
    
    master()
    

    在这里你有一个live example

  • 4

    除了@DanielSanchez出色的答案,我建议实际上依赖于类似Java CountDownLatch的机制 .

    要点是,

    • 你创建一个 latch ,它将 open only after a certain counter went down

    • 当闩锁打开时,等待它的线程将被允许继续执行 .

    • 我做了一个非常简单的例子,检查here类似于这样一个锁存器的例子:

    import threading
    import Queue
    import time
    
    WORKER_COUNT = 3
    latch = threading.Condition()
    count = 3
    
    def wait():
        latch.acquire()
        while count > 0:
            latch.wait()
        latch.release()
    
    def count_down():
        global count
        latch.acquire()
        count -= 1
        if count <= 0:
            latch.notify_all()
        latch.release()
    
    def worker(n, q):
        # n - Worker ID
        # q - Queue from which to receive data
        while True:
            data = q.get()
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
            if data == -1: # -1 is used to indicate that the worker should stop
                # Requeue the exit indicator.
                q.put(-1)
                # Commit suicide.
                count_down()
                print 'worker', n, 'is exiting'
                break
    
    # master() sends data to worker() via q.  
    
    def master():
        q = Queue.Queue()
    
        # Create 3 workers.
        for i in range(WORKER_COUNT):
            t = threading.Thread(target=worker, args=(i, q))
            t.start()
    
        # Send 10 items to work on.
        for i in range(10):
            q.put(i)
            time.sleep(0.5)
    
        # Send an exit indicator for all threads to consume.
        q.put(-1)
        wait()
        print 'done'
    
    master()
    

相关问题