首页 文章

阻塞队列和多线程消费者,如何知道何时停止

提问于
浏览
62

我有一个单线程生成器,它创建一些任务对象,然后添加到 ArrayBlockingQueue (具有固定大小) .

我也开始了一个多线程的消费者 . 这是一个固定的线程池( Executors.newFixedThreadPool(threadCount); ) . 然后我向这个threadPool提交了一些ConsumerWorker入口,每个ConsumerWorker都对上面提到的ArrayBlockingQueue实例进行了引用 .

每个这样的工作人员都会在队列中执行 take() 并处理该任务 .

我的问题是,当没有更多的工作要做时,让 Worker 知道的最佳方法是什么 . 换句话说,如何告诉Workers, 生产环境 者已经完成了对队列的添加,从这一点开始,每个工作人员在看到Queue为空时应该停止 .

我现在得到的是一个设置,其中我的Producer初始化了一个回调,当他完成它的工作(向队列中添加东西)时会触发回调 . 我还保留了我创建并提交给ThreadPool的所有ConsumerWorkers的列表 . 当Producer Callback告诉我 生产环境 者已完成时,我可以告诉每个 Worker . 此时,他们应该继续检查队列是否为空,当它变为空时,它们应该停止,从而允许我优雅地关闭ExecutorService线程池 . 就是这样的

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是我有一个明显的竞争条件,有时 生产环境 者将完成,发出信号,消费者工作者将在消耗队列中的所有内容之前停止 .

我的问题是,同步这个的最佳方法是什么,以便一切正常?我应该同步整个部分来检查 生产环境 者是否正在运行加上如果队列是空的加上从队列中取出一些块(在队列对象上)?我应该只在ConsumerWorker实例上同步 isRunning 布尔的更新吗?还有其他建议吗?

更新,这里是我最终使用的工作实施:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这很大程度上受到JohnVint在下面的回答的启发,只有一些小修改 .

===由于@ vendhan的评论而更新 .

谢谢你的观察 . 你是对的,这个问题的第一个代码片段(在其他问题中)是 while(isRunning || !inputQueue.isEmpty()) 没有真正意义的那个 .

在我实际的最终实现中,我做了一些更接近你更换“||”的建议 . (或)用“&&”(和),从某种意义上说,每个 Worker (消费者)现在只检查他从列表中得到的元素是否是毒丸,如果是的话就停止了(理论上我们可以说 Worker 有要运行并且队列不能为空) .

6 回答

  • 81

    您应该从队列中继续 take() . 你可以使用毒丸告诉 Worker 停止 . 例如:

    private final Object POISON_PILL = new Object();
    
    @Override
    public void run() {
        //worker loop keeps taking en element from the queue as long as the producer is still running or as 
        //long as the queue is not empty:
        while(isRunning) {
            System.out.println("Consumer "+Thread.currentThread().getName()+" START");
            try {
                Object queueElement = inputQueue.take();
                if(queueElement == POISON_PILL) {
                     inputQueue.add(POISON_PILL);//notify other threads to stop
                     return;
                }
                //process queueElement
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    //this is used to signal from the main thread that he producer has finished adding stuff to the queue
    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
        inputQueue.add(POISON_PILL);
    }
    
  • 0

    我会给 Worker 们发一个特殊的工作包来表示他们应该关闭:

    public class ConsumerWorker implements Runnable{
    
    private static final Produced DONE = new Produced();
    
    private BlockingQueue<Produced> inputQueue;
    
    public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
        this.inputQueue = inputQueue;
    }
    
    @Override
    public void run() {
        for (;;) {
            try {
                Produced item = inputQueue.take();
                if (item == DONE) {
                    inputQueue.add(item); // keep in the queue so all workers stop
                    break;
                }
                // process `item`
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    }

    要停止工作程序,只需将 ConsumerWorker.DONE 添加到队列中即可 .

  • 1

    在您尝试从队列中检索元素的代码块中,使用 poll(time,unit) 而不是 take() .

    try { 
        Object queueElement = inputQueue.poll(timeout,unit);
         //process queueElement        
     } catch (InterruptedException e) {
            if(!isRunning && queue.isEmpty())
             return ; 
     }
    

    通过指定适当的超时值,可以确保线程不会阻塞,以防有一个不幸的序列

    • isRunning 是真的

    • 队列变空,因此线程进入阻塞等待(如果使用 take()

    • isRunning 设置为false

  • 14

    您可以使用许多策略,但一个简单的策略是使用任务的子类来表示作业的结束 . 制作人不直接发送此信号 . 相反,它将此任务子类的实例排入队列 . 当您的某个消费者完成此任务并执行该任务时,会导致信号被发送 .

  • 0

    我不得不使用多线程 生产环境 者和多线程消费者 . 我最终得到了一个 Scheduler -- N Producers -- M Consumers 方案,每个方案都通过队列进行通信(总共两个队列) . 调度程序使用生成数据的请求填充第一个队列,然后使用N "poison pills"填充它 . 有一个活跃的 生产环境 者计数器(原子int),最后一个接收最后一个毒丸的 生产环境 者将M毒丸送到消费者队列 .

  • 0

    我们不能使用 CountDownLatch 来做,其中size是 生产环境 者中的记录数 . 每个消费者都会在处理完记录之后 countDown . 当所有任务完成时,它会跨越 awaits() 方法 . 然后停止所有你的消费者 . 随着所有记录的处理 .

相关问题