public class Multiplexer<M> {
private final List<BlockingQueue<M>> consumers
= new CopyOnWriteArrayList<BlockingQueue<M>>();
public void publish(M msg) {
for (BlockingQueue<M> q : consumers) {
q.offer(msg);
}
}
public void addConsumer(BlockingQueue<M> consumer) {
consumers.add(consumer);
}
}
4 回答
基本上你在谈论多路复用,并且标准库中没有任何东西,但创建一个非常简单 . 假设您的客户对订阅之前发布的消息不感兴趣,那么您需要为每个消费者提供一个队列池,并且发布只是将项目提供给每个队列:
此版本允许消费者使用他们可能想要的任何阻塞队列实现 . 如果需要,您显然可以为客户端提供标准实现和良好的界面 .
第三个条件不是普通的java,但是你可以为每个使用者使用一个单独的头部nonblocking linked queue(你可以依靠GC来收集未引用的节点)
最简单的策略是将消息传递给每个消费者,我不会有这么多的消费者,消费者的数量很重要 . 您可以在几微秒内向几十个消费者添加消息 .
避免这种情况的一种方法是使用具有许多读取器的圆环缓冲器 . 这实现起来很棘手,意味着消费者将受限于他们可以拥有的消息来源数量 .
只有一个伪消费者,让真正的消费者注册伪消费者 . 当 生产环境 者发送消息时,伪消费者醒来并消费该消息 . 在消费消息时,伪消费者为每个注册的消费者创建一个单独的
Runnable
并在线程池上执行它们 .