首页 文章

Java BlockingQueue与批处理?

提问于
浏览
17

我对与Java BlockingQueue相同的数据结构感兴趣,但它必须能够批处理队列中的对象 . 换句话说,我希望 生产环境 者能够将对象放入队列,但是在 take() 上使用消费者块,直到队列达到一定的大小(批量大小) .

然后,一旦队列达到批量大小, 生产环境 者必须阻止 put() 直到消费者已经消耗了队列中的所有元素(在这种情况下, 生产环境 者将再次开始 生产环境 并且消费者块直到再次到达批次) .

是否存在类似的数据结构?或者我应该写它(我不介意),如果有什么东西,我只是不想浪费我的时间 .


UPDATE

也许稍微澄清一下事情:

情况总是如下 . 可以有多个 生产环境 者向队列添加项目,但永远不会有多个消费者从队列中获取项目 .

现在,问题是这些设置有多个并行和串行 . 换句话说, 生产环境 者为多个队列 生产环境 物品,而消费者本身也可以是 生产环境 者 . 这可以更容易地被视为 生产环境 者,消费者 - 生产环境 者和最终消费者的有向图 .

生产环境 者应该阻塞直到队列为空(@Peter Lawrey)的原因是因为每个都将在一个线程中运行 . 如果你让它们只是在空间可用的情况下生成,你最终会遇到太多线程试图同时处理太多东西的情况 .

也许将它与执行服务相结合可以解决问题?

4 回答

  • 1

    这听起来像RingBuffer在LMAX Disruptor模式中的工作方式 . 有关更多信息,请参见http://code.google.com/p/disruptor/ .

    一个非常粗略的解释是你的主要数据结构是RingBuffer . 生产环境 者按顺序将数据放入环形缓冲区,消费者可以提取 生产环境 者放入缓冲区的数据(基本上是批处理) . 如果缓冲区已满,则 生产环境 者将阻塞,直到使用者完成并释放缓冲区中的插槽 .

  • 2

    我建议你使用BlockingQueue.drainTo(Collection, int) . 您可以将它与take()一起使用,以确保获得最少数量的元素 .

    使用此方法的优点是您的批量大小随工作负载动态增长, 生产环境 者不必在消费者忙时阻止 . 即它自我优化延迟和吞吐量 .


    要完全按照要求实现(我认为这是一个坏主意),您可以使用带有繁忙消费线程的SynchronousQueue .

    即消费线程做了

    list.clear();
     while(list.size() < required) list.add(queue.take());
     // process list.
    

    生产环境 者将在消费者忙碌时阻止 .

  • 1

    这是一个快速(=简单但未完全测试)的实现,我认为可能适合您的请求 - 如果需要,您应该能够扩展它以支持完整的队列接口 .

    为了提高性能,您可以切换到ReentrantLock,而不是使用“synchronized”关键字 .

    public class BatchBlockingQueue<T> {
    
        private ArrayList<T> queue;
        private Semaphore readerLock;
        private Semaphore writerLock;
        private int batchSize;
    
        public BatchBlockingQueue(int batchSize) {
            this.queue = new ArrayList<>(batchSize);
            this.readerLock = new Semaphore(0);
            this.writerLock = new Semaphore(batchSize);
            this.batchSize = batchSize;
        }
    
        public synchronized void put(T e) throws InterruptedException {
            writerLock.acquire();
            queue.add(e);
            if (queue.size() == batchSize) {
                readerLock.release(batchSize);
            }
        }
    
        public synchronized T poll() throws InterruptedException {
            readerLock.acquire();
            T ret = queue.remove(0);
            if (queue.isEmpty()) {
                writerLock.release(batchSize);
            }
            return ret;
        }
    
    }
    

    希望你觉得它有用 .

  • 13

    不是我知道的 . 如果我理解正确,你想要 生产环境 者工作(当消费者被阻止时),直到它填满队列或消费者工作( 生产环境 者阻塞),直到它清除队列 . 如果是这种情况,我可能会建议您不需要数据结构,而是需要一种机制来阻止一方而另一方正在使用互斥锁 . 您可以为此锁定一个对象,并在内部具有full或empty的逻辑以释放锁并将其传递给另一方 . 总之,你应该自己写:)

相关问题