首页 文章

如何在Spring AMQP中使用Ack或Nack

提问于
浏览
9

我是Spring AMQP的新手 . 我有一个应用程序,它是一个 生产环境 者向另一个消费者的应用程序发送消息 .

消费者收到消息后,我们将对数据进行验证 .

If the data is proper we have to ACK and message should be removed from the Queue. If the data is improper we have to NACK(Negative Acknowledge) the data so that it will be re-queued in RabbitMQ .

我碰到

**factory.setDefaultRequeueRejected(false);** (根本不会重新排列消息)

**factory.setDefaultRequeueRejected(true);** (它会在发生异常时重新排队消息)

但我的情况是,我将基于验证确认该消息 . 然后它应该删除该消息 . 如果NACK然后重新排列该消息 .

我在RabbitMQ网站上看过

The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them

如何实现上述场景?请给我一些例子 .

我尝试了一个小程序

logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));

消息不会为不同的异常重新排队 factory.setDefaultRequeueRejected(true)

09:46:38,854 ERROR [stderr](SimpleAsyncTaskExecutor-1)org.activiti.engine.ActivitiObjectNotFoundException:没有使用键'WF89012'09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler]部署的进程(SimpleAsyncTaskExecutor-1)从错误队列收到:

1 回答

  • 10

    the documentation .

    默认情况下,(使用 defaultRequeueRejected=true )如果侦听器正常退出,则容器将响应消息(导致其被删除),或者如果侦听器抛出异常,则拒绝(并重新排队)它 .

    如果侦听器(或错误处理程序)抛出 AmqpRejectAndDontRequeueException ,则会覆盖默认行为并丢弃该消息(如果已配置,则路由到DLX / DLQ) - 容器调用 basicReject(false) 而不是 basicReject(true) .

    因此,如果您的验证失败,请抛出 AmqpRejectAndDontRequeueException . 或者,使用自定义错误处理程序配置侦听器,以将异常转换为 AmqpRejectAndDontRequeueException .

    这在this answer中有描述 .

    如果您真的想要自己负责,请将确认模式设置为 MANUAL ,如果您使用 @RabbitListener ,请使用 ChannelAwareMessageListenerthis technique .

    但大多数人只是让容器处理事情(一旦他们了解正在发生的事情) . 通常,使用手动ack是针对特殊用例,例如延迟acks或早期acking .

    EDIT

    在我指出的答案中有一个错误(现在已经修复);你必须看看 ListenerExecutionFailedException 的原因 . 我刚测试了这个,它按预期工作...

    @SpringBootApplication
    public class So39530787Application {
    
        private static final String QUEUE = "So39530787";
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args);
            RabbitTemplate template = context.getBean(RabbitTemplate.class);
            template.convertAndSend(QUEUE, "foo");
            template.convertAndSend(QUEUE, "bar");
            template.convertAndSend(QUEUE, "baz");
            So39530787Application bean = context.getBean(So39530787Application.class);
            bean.latch.await(10, TimeUnit.SECONDS);
            System.out.println("Expect 1 foo:"  + bean.fooCount);
            System.out.println("Expect 3 bar:"  + bean.barCount);
            System.out.println("Expect 1 baz:"  + bean.bazCount);
            context.close();
        }
    
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setErrorHandler(new ConditionalRejectingErrorHandler(
                    t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException));
            return factory;
        }
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE, false, false, true);
        }
        private int fooCount;
    
        private int barCount;
    
        private int bazCount;
    
        private final CountDownLatch latch = new CountDownLatch(5);
    
        @RabbitListener(queues = QUEUE)
        public void handle(String in) throws Exception {
            System.out.println(in);
            latch.countDown();
            if ("foo".equals(in) && ++this.fooCount < 3) {
                throw new FooException();
            }
            else if ("bar".equals(in) && ++this.barCount < 3) {
                throw new BarException();
            }
            else if ("baz".equals(in)) {
                this.bazCount++;
            }
        }
    
        @SuppressWarnings("serial")
        public static class FooException extends Exception { }
    
        @SuppressWarnings("serial")
        public static class BarException extends Exception { }
    
    }
    

    结果:

    Expect 1 foo:1
    Expect 3 bar:3
    Expect 1 baz:1
    

相关问题