首页 文章

消费者线程等待多个qeueues

提问于
浏览
2

我有多个任务队列和一个 consumer 线程 .

一旦任何队列有任务,就应该唤醒线程 . 所以我想知道进行这种沟通的最佳方式是什么 .

这些是一些可以解决这个问题的实现,并解释了为什么我想找到不同的东西:

  • 实现它的一种方法是使用一些监视器对象并从消费者线程调用 .wait(timeout) 并从 生产环境 者线程调用 .notify() . 但是这种方法使用 wait/notify 这是一种低级api,所以我尽量避免使用它 . 此外,它并不总是正确的,在某些情况下,我们可能最终等待整个超时,而我们有任务要做(睡觉理发师问题) .

  • CountDownLatchreset 方法之类的东西会很好,但我在 java.util.concurrent 中找不到类似的东西 . 实施将相当简单,但实施新自行车是我想要避免的更多的事情 wait/notify . 另外我认为它与等待整个超时的问题一样 wait/notify .

  • 使生成器将创建的实体包装成某些 Task 并使所有生成器写入同一队列,以便使用者可以侦听单个队列 . 在我认为的大多数情况下,这种方法实际上非常好,但在我的情况下,这部分应用程序具有低延迟要求,因此我必须避免创建新对象(例如这些包装器)并且还会增加队列尾部的争用(而不是一个消费者,所有人都会在那里写)这对延迟也不是很好 .

那么有没有其他方法来实现它?(可能使用其他一些并发原语)

1 回答

  • 1

    如何使用任务通知队列,在这种情况下,如果任何任务队列添加和项目,它也将项目添加到ntification队列 .

    以下剪辑说明了这种方法:

    import java.util.Queue;
    import java.util.concurrent.ArrayBlockingQueue;
    
    public class Main<T1, T2>  {
    
    
      Queue<T1> taskType1Queue = new ArrayBlockingQueue<T1>(10);
      Queue<T2> taskType2Queue= new ArrayBlockingQueue<T2>(10);
      ArrayBlockingQueue<Boolean> notificationQueue= new ArrayBlockingQueue<Boolean>(2);
    
    
      public void produceType1(T1 task) {
        new Thread(new Runnable() {
          @Override
          public void run() {
            taskType1Queue.add(task);
            notificationQueue.offer(true);; //does not block if full
          }
        });
      }
    
      public void produceType2(T2 task) {
        new Thread(new Runnable() {
          @Override
          public void run() {
            taskType2Queue.add(task);
            notificationQueue.offer(true); //does not block if full
          }
        });
      }
    
    
      public void consume() {
    
        try {
          notificationQueue.take();//wait till task1 o task2 has been published
    
          for(;!Thread.currentThread().isInterrupted();){
            T1 task1 = taskType1Queue.poll();//does not block if queue is empty
            if (task1 != null) {
              //do something
            }
            T2 task2 = taskType2Queue.poll();//does not block if queue is empty
            if (task2 != null) {
              //do something
            }
            if(task1 == null && task2 == null) {
              break;
            }
          }
    
        } catch (InterruptedException e) {
          System.out.println("Consumer thread done");
          return;
        }
    
      }
    
    }
    

相关问题