我正在处理大容量流〜每秒500 msgs,使用带有10个并发消费者的SimpleMessageListenerContainer从Spring AMQP Rabbit消耗数据,我必须每隔15分钟对Db进行一些检查并重新加载某些属性进行处理,这是用石英触发器完成,每15分钟触发一次,停止SimplelistenerContainer,完成必要的工作并再次启动Container .
当应用程序启动时,一切都运行良好,当触发器触发并且Container重新启动时,我看到多次传递相同的消息,这会导致大量重复 . 消费者没有抛出任何消息 .
The Message Listener
class RoundRobinQueueListener implements MessageListener {
@Override
public void onMessage(Message message) { //do processing
}
}
During app startup set up parallel consumers and start the consumer
final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance();
messageListenerContainer.setQueueNames(queueName);
messageListenerContainer.setMessageListener(roundRobinListener);
messageListenerContainer.setConcurrentConsumers(10);
messageListenerContainer.setChannelTransacted(true);
The quartz trigger
void execute(JobExecutionContext context) throws JobExecutionException {
messageListenerContainer.stop()
//Do db task, other processing
messageListenerContainer.start()
}
1 回答
您的消息现在看起来像消费者一样 . 如果您没有使用自动确认模式,则需要自己确认消息(也可以在SimpleMessageListenerContainer配置) . 否则,代理会假定邮件未成功处理并尝试再次传递 .