假设有1个 生产环境 者P和2个消费者C1和C2 . 并且有两个队列Q1和Q2,都具有特定的容量 .
P将 生产环境 物品并交替进入Q1和Q2 . 物品是为特定消费者 生产环境 的,不能被其他消费者消费 . 如何在Java中实现以下内容:在我启动3个线程后,如果Q1为空,则线程C1被阻塞,直到在Q1中存在某些内容时通知它 . Q2也是如此 . 当Q1和Q2都满时,P将被阻塞,直到Q1或Q2未满时通知 .
我正在考虑使用BlockingQueue,它会在队列为空时阻止使用者 . 但问题是当其中一个队列已满时, 生产环境 者将被阻止 . 我们可以使用Java中的任何数据结构来解决这个问题吗?
更新
我've got a solution myself, but I'我不确定它是否有效 . 我们仍然可以有2个BlockingQueues . 当消费者从其队列中获取项目时,它使用 BlockingQueue.take()
,因此当队列中没有项目时它将被阻止 . 当 生产环境 者将项目添加到任一队列时,它使用 BlockingQueue.offer()
. 因此,它永远不会被此操作阻止,并且如果队列已满,将获得'false' . 另外,我们保留一个AtomicInteger来指示未满的队列数 . 每当 生产环境 者P想要将一个项目放入队列时,如果它得到错误的返回,我们将AtomicInteger减少1.当它达到0时, 生产环境 者调用 AtomicInteger.wait()
. 每当消费者从其队列中获取项目时,它也会检查AtomicInteger . 当它为0时,消费者将其增加1并调用 AtomicInteger.notify()
.
请告诉我这个解决方案是否有意义 .
非常感谢!
3 回答
你考虑过Striped Executor Service . 这将使您能够解决您的问题 and 将您的消费者放入一个更高效的池中 .
您可以使用框架中的主题 .
在activemq http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html
在hornetq exact example for JMS Topic in HornetQ
无论您选择哪种数据结构/信息服务器,都可以使用其中任何一种资源 . 内存或磁盘空间总是有限制的 .
所以实际上 生产环境 者停止也不错 .
如果你的队列正在填满,你应该尝试恢复 balance :你可以添加更多的消费者 . 您可以改善消费者's performance. If this is not possible, something should really throttle the producer. It',以避免内存不足错误或设备上没有剩余空间 .
最后,无论如何,数据中心都有责任监控队列 . 如果您的队列填充程度达到限制,他们应该通知您,例如> 80% .
更新
如果 生产环境 者无法发送所有队列,因为其中一个队列已满,则由他来缓冲,但缓存是队列应该做的事情 .