我们使用的是spring-amqp 1.5.2,RabbitMQ版本为3.5.3 . 所有队列都运行良好,我们让消费者在没有任何问题的情况下听取消息,除了一个消费者不断神秘地断开连接 . spring-amqp auto恢复,但几个小时后消费者断开连接,再也没有回来 .
队列声明为
@Bean()
public Queue analyzeTransactionsQueue(){
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000);
return new Queue("analyze.txns", true, false, false, args);
}
其他队列以类似的方式声明,没有问题 .
消费者(监听者)被声明为
@Bean
public SimpleRabbitListenerContainerFactory analyzeTransactionListenerContainerFactory(ConnectionFactory connectionFactory, AsyncTaskExecutor asyncTaskExecutor) {
connectionFactory.getVirtualHost());
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
factory.setTaskExecutor(asyncTaskExecutor);
ConsumerTagStrategy consumerTagStrategy = new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue;
}
};
factory.setConsumerTagStrategy(consumerTagStrategy);
return factory;
}
同样,其他没有问题的消费者也以类似的方式宣布 .
收到消息后的代码没有例外 . 即使在为SimpleMessageListenerContainer打开DEBUG日志记录之后,日志中也没有错误 .
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Cancelling Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@10.17.1.13:5672/,47), acknowledgeMode=AUTO local queue size=0;
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Idle consumer terminating: Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@10.17.1.13:5672/,47), acknowledgeMode=AUTO local queue size=0;
关于为什么会发生这种情况的任何想法 . 尝试过DEBUG日志,但无济于事 .
2 回答
我观察到的一件事是,如果在解析过程中出现异常,消费者会断开连接并且它并不总是记录问题,具体取决于您的日志配置...
从那时起,我总是将handleDelivery方法包装到try catch中,以获得更好的日志记录并且没有连接丢弃:
看看你配置的方式,很明显你已经启用了动态扩展消费者 .
有一个线程问题,我提交了一个修复程序,导致消费者数量降至零 . 这种情况在消费者缩减时发生 .
从它的外观来看,你一直是这个问题的受害者 . 该修复程序已被后端移植我相信并且可以看到here
尝试使用最新版本,看看是否遇到同样的问题 .