首页 文章

Spring AMQP Consumer神秘地丢弃了与队列的连接

提问于
浏览
1

我们使用的是spring-amqp 1.5.2,RabbitMQ版本为3.5.3 . 所有队列都运行良好,我们让消费者在没有任何问题的情况下听取消息,除了一个消费者不断神秘地断开连接 . spring-amqp auto恢复,但几个小时后消费者断开连接,再也没有回来 .

队列声明为

@Bean()
public Queue analyzeTransactionsQueue(){
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000);
    return new Queue("analyze.txns", true, false, false, args);
}

其他队列以类似的方式声明,没有问题 .

消费者(监听者)被声明为

@Bean
public SimpleRabbitListenerContainerFactory analyzeTransactionListenerContainerFactory(ConnectionFactory connectionFactory, AsyncTaskExecutor asyncTaskExecutor) {
connectionFactory.getVirtualHost());
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(2);
    factory.setMaxConcurrentConsumers(4);
    factory.setTaskExecutor(asyncTaskExecutor);
    ConsumerTagStrategy consumerTagStrategy = new ConsumerTagStrategy() {
        @Override
        public String createConsumerTag(String queue) {
            return queue;
        }
    };
    factory.setConsumerTagStrategy(consumerTagStrategy);
    return factory;
}

同样,其他没有问题的消费者也以类似的方式宣布 .

收到消息后的代码没有例外 . 即使在为SimpleMessageListenerContainer打开DEBUG日志记录之后,日志中也没有错误 .

LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Cancelling Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@10.17.1.13:5672/,47), acknowledgeMode=AUTO local queue size=0; 
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Idle consumer terminating: Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@10.17.1.13:5672/,47), acknowledgeMode=AUTO local queue size=0;

关于为什么会发生这种情况的任何想法 . 尝试过DEBUG日志,但无济于事 .

2 回答

  • 0

    我观察到的一件事是,如果在解析过程中出现异常,消费者会断开连接并且它并不总是记录问题,具体取决于您的日志配置...

    从那时起,我总是将handleDelivery方法包装到try catch中,以获得更好的日志记录并且没有连接丢弃:

    consumer = new DefaultConsumer(channel) {
    
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
    
                log.info("processing message - content : " + new String(body, "UTF-8"));
    
               try {
                    MyEvent myEvent = objectMapper.readValue(new String(body, "UTF-8"), MyEvent.class);
                    processMyEvent(myEvent);
    
                } catch (Exception exp) {
                    log.error("couldn't process "+MyEvent.class+" message : ", exp);
                }
            }
        };
    
  • 0

    看看你配置的方式,很明显你已经启用了动态扩展消费者 .

    factory.setConcurrentConsumers(2);
    factory.setMaxConcurrentConsumers(4);

    有一个线程问题,我提交了一个修复程序,导致消费者数量降至零 . 这种情况在消费者缩减时发生 .

    从它的外观来看,你一直是这个问题的受害者 . 该修复程序已被后端移植我相信并且可以看到here

    尝试使用最新版本,看看是否遇到同样的问题 .

相关问题