我想从RabbitMq队列中消耗我的Storm Spout中的消息 .
现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息 .
Spring AMQP提供机制(创建监听器或使用注释@RabbitListner)从队列中读取消息 .
问题是我可以让一个监听器从队列中读取消息 . 但是如何将此消息发送到在Storm集群上运行的Storm Spout?
拓扑将启动一个集群,但在我的spout的nextTuple()方法中,我需要从此Queue中读取消息 . 可以在这里使用Spring AMQP吗?
我有一个侦听器配置为从队列中读取消息:
@RabbitListener(queues = "queueName")
public void processMessage(QueueMessage message) {
}
如何将收听者收到的上述消息发送到我在集群上运行的喷口 .
或者,spout的nextTuple()方法如何在其中包含此方法?可能吗
我在这里使用Java作为语言 .
1 回答
您可以使用
RabbitTemplate
receive
或receiveAndConvert
方法之一按需读取消息(而不是消息驱动) .默认情况下,如果队列中没有消息,它们将返回null .
EDIT :
如果设置
receiveTimeout
(在1.5或更高版本中可用),则接收方法将在此时阻止(它在内部使用异步使用者并且不进行轮询) .但它仍然没有听众那么高效,因为为每个方法创建了一个新的消费者;要使用监听器,您需要在
nextTuple()
(例如BlockingQueue
)中使用一些内部阻塞机制来等待消息 .