首页 文章

记录Spring AMQP的消息侦听器抛出的异常

提问于
浏览
1

我有一个使用异步使用者处理消息的应用程序(通过@RabbitListener) . 在此使用者方法中,由于我定义的策略而发生异常并且消息被重新排队:

spring:
    rabbitmq:
        listener:
            simple:
                default-requeue-rejected: false
                retry:
                    enabled: true
                    max-attempts: 10
                    initial-interval: 60000 # a minute
                    multiplier: 2
                    max-interval: 600000 # 10 minutes

consumer方法调用一个私有方法,该方法以递归方式从DB中提取数据并使用RabbitTemplate推送到队列中 . 我预计此队列中大约有200条消息,但它会上升到大约700k,然后由于重试策略耗尽而导致消费者线程停止 .

问题是我找不到任何记录异常的地方,因此我无法理解业务逻辑的哪个部分导致了这个问题 . 我可以尝试将整个函数放入try / catch块并记录问题,然后重新抛出它以进行Spring AMQP的异常处理,但我想知道是否存在更好的方法 .

我的项目具有以下依赖项:

Spring Boot: 1.5.9.RELEASE
Spring AMQP: 1.7.4.RELEASE
RabbitMQ: 3.7.2

1 回答

  • 0

    我们应该尝试让它更容易添加 RetryListener ,但你现在可以通过更换重试拦截器来实现,如下所示......

    @SpringBootApplication
    public class So48331502Application {
    
        private static final Logger logger = LoggerFactory.getLogger(So48331502Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So48331502Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitListenerEndpointRegistry registry,
                RabbitProperties properties, Advice interceptor) {
            return args -> {
                ListenerRetry retry = properties.getListener().getSimple().getRetry();
                if (retry.isEnabled()) {
                    SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) registry
                            .getListenerContainer("myListener");
                    container.setAdviceChain(interceptor);
                }
                registry.start();
            };
        }
    
        @Bean
        public StatelessRetryOperationsInterceptorFactoryBean interceptor(RabbitProperties properties) {
            ListenerRetry retry = properties.getListener().getSimple().getRetry();
            RetryTemplate retryTemplate = new RetryTemplate();
            RetryPolicy retryPolicy = new SimpleRetryPolicy(retry.getMaxAttempts());
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(retry.getInitialInterval());
            backOffPolicy.setMultiplier(retry.getMultiplier());
            backOffPolicy.setMaxInterval(retry.getMaxInterval());
            retryTemplate.setRetryPolicy(retryPolicy);
            retryTemplate.setBackOffPolicy(backOffPolicy);
            retryTemplate.registerListener(
                new RetryListener() {
    
                    @Override
                    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                        return true;
                    }
    
                    @Override
                    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
                            Throwable throwable) {
                        if (throwable != null) {
                            logger.info("Failed: Retry count " + context.getRetryCount(), throwable);
                        }
                    }
    
                    @Override
                    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
                            Throwable throwable) {
                        logger.info("Retry count " + context.getRetryCount(), throwable);
                    }
                });
            StatelessRetryOperationsInterceptorFactoryBean interceptor =
                    new StatelessRetryOperationsInterceptorFactoryBean();
            interceptor.setRetryOperations(retryTemplate);
            return interceptor;
        }
    
        @RabbitListener(id="myListener", queues = "one")
        public void in(Object in) {
            throw new RuntimeException();
        }
    
    }
    

    请注意,您必须将 auto-startup 设置为false,以便更改建议链...

    spring:
      rabbitmq:
        listener:
          simple:
            auto-startup: 'false'
            retry:
              enabled: 'true'
    

    然后启动注册表,它将启动所有容器 .

相关问题