首页 文章

Java - 多个队列生成者消费者

提问于
浏览
2

我有以下代码:

while(!currentBoard.boardIsValid()){
        for (QueueLocation location : QueueLocation.values()){
            while(!inbox.isEmpty(location)){
                Cell c = inbox.dequeue(location);
                notifyNeighbours(c.x, c.y, c.getCurrentState(),previousBoard);
            }
        }
    }

我有一个有几个队列的消费者(他们所有的方法都是同步的) . 每个 生产环境 者一个队列 . 消费者遍历所有队列并检查他们是否有任务让他消费 . 如果他正在检查的队列中有一个任务,他就会消耗它 . 否则,他会检查下一个队列,直到他完成对所有队列的迭代 .

截至目前,如果他遍历所有队列并且它们都是空的,他继续循环而不是等待其中一个包含某些东西(如外部 while 所见) .

如何让消费者等到其中一个队列中有东西?

我遇到以下情况的问题:让我们说只有2个队列 . 消费者检查了第一个并且它是空的 . 正如他正在检查第二个(也是空的), 生产环境 者将东西放在第一个队列中 . 就消费者而言,队列都是空的,所以他应该等待(即使其中一个不再是空的,他应该继续循环) .

编辑:最后一件事 . 这对我来说是一个练习 . 我正在尝试自己实现同步 . 因此,如果任何一个java库都有一个实现这个的解决方案,我对它不感兴趣 . 我试图了解如何实现这一点 .

3 回答

  • 0

    如果要阻塞多个队列,那么一个选项是使用java的Lock and Condition objects and then use the signal method .

    因此,只要 生产环境 者有数据,它就应该调用 signallAll .

    Lock fileLock = new ReentrantLock();
    Condition condition = fileLock.newCondition();
    ...
    // producer has to signal
    condition.signalAll();
    ...
    // consumer has to await.
    condition.await();
    

    这种方式只有在提供信号时,消费者才会去检查队列 .

  • 1

    @Abe很接近 . 我会使用信号并等待 - 使用Object类内置函数,因为它们是最轻的权重 .

    Object sync = new Object();  // Can use an existing object if there's an appropriate one
    
    // On submit to queue
    synchronized ( sync ) {
        queue.add(...);  // Must be inside to avoid a race condition
        sync.notifyAll();
    }
    
    // On check for work in queue
    synchronized ( sync ) {
        item = null;
        while ( item == null ) {
            // Need to check all of the queues - if there will be a large number, this will be slow,
            // and slow critical sections (synchronized blocks) are very bad for performance
            item = getNextQueueItem();
            if ( item == null ) {
                sync.wait();
            }
        }
    }
    

    请注意 sync.wait 释放同步锁定,直到通知 - 并且需要锁定同步才能成功调用wait方法(这提醒程序员确实需要某种类型的关键部分才能使其可靠地工作) .

    顺便说一下,如果可行的话,我会推荐一个专门针对消费者(或消费者群体)的队列,而不是专用于 生产环境 者的队列 . 它将简化解决方案 .

  • 0

    我按照@Abe所建议的方式解决了类似的情况,但决定使用 SemaphoreAtomicBoolean 组合并将其称为BinarySemaphore . 它确实需要对 生产环境 者进行修改,以便在有事情要做时发出信号 .
    在BinarySemaphore的代码下面,以及消费者工作循环应该是什么样子的一般概念:

    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class MultipleProdOneConsumer {
    
    BinarySemaphore workAvailable = new BinarySemaphore();
    
    class Consumer {
    
        volatile boolean stop;
    
        void loop() {
    
            while (!stop) {
                doWork();
                if (!workAvailable.tryAcquire()) {
                    // waiting for work
                    try {
                        workAvailable.acquire();
                    } catch (InterruptedException e) {
                        if (!stop) {
                            // log error
                        }
                    }
                }
            }
        }
    
        void doWork() {}
    
        void stopWork() {
            stop = true;
            workAvailable.release();
        }
    }
    
    class Producer {
    
        /* Must be called after work is added to the queue/made available. */
        void signalSomethingToDo() {
            workAvailable.release();
        }
    }
    
    class BinarySemaphore {
    
        private final AtomicBoolean havePermit = new AtomicBoolean();
        private final Semaphore sync;
    
        public BinarySemaphore() {
            this(false);
        }
    
        public BinarySemaphore(boolean fair) {
            sync = new Semaphore(0, fair);
        }
    
        public boolean release() {
    
            boolean released = havePermit.compareAndSet(false, true);
            if (released) {
                sync.release();
            }
            return released;
        }
    
        public boolean tryAcquire() {
    
            boolean acquired = sync.tryAcquire();
            if (acquired) {
                havePermit.set(false);
            }
            return acquired;
        }
    
        public boolean tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException {
    
            boolean acquired = sync.tryAcquire(timeout, tunit);
            if (acquired) {
                havePermit.set(false);
            }
            return acquired;
        }
    
        public void acquire() throws InterruptedException {
    
            sync.acquire();
            havePermit.set(false);
        }
    
        public void acquireUninterruptibly() {
    
            sync.acquireUninterruptibly();
            havePermit.set(false);
        }
    
    }
    
    }
    

相关问题