我是Spring AMQP的新手 . 我有一个应用程序,它是一个 生产环境 者向另一个消费者的应用程序发送消息 .
消费者收到消息后,我们将对数据进行验证 .
If the data is proper we have to ACK and message should be removed from the Queue. If the data is improper we have to NACK(Negative Acknowledge) the data so that it will be re-queued in RabbitMQ .
我碰到
**factory.setDefaultRequeueRejected(false);**
(根本不会重新排列消息)
**factory.setDefaultRequeueRejected(true);**
(它会在发生异常时重新排队消息)
但我的情况是,我将基于验证确认该消息 . 然后它应该删除该消息 . 如果NACK然后重新排列该消息 .
我在RabbitMQ网站上看过
The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them
如何实现上述场景?请给我一些例子 .
我尝试了一个小程序
logger.info("Job Queue Handler::::::::::" + new Date());
try {
}catch(Exception e){
logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");
}
factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
return cause instanceof XMLException;
}));
消息不会为不同的异常重新排队 factory.setDefaultRequeueRejected(true)
09:46:38,854 ERROR [stderr](SimpleAsyncTaskExecutor-1)org.activiti.engine.ActivitiObjectNotFoundException:没有使用键'WF89012'09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler]部署的进程(SimpleAsyncTaskExecutor-1)从错误队列收到:
1 回答
见the documentation .
默认情况下,(使用
defaultRequeueRejected=true
)如果侦听器正常退出,则容器将响应消息(导致其被删除),或者如果侦听器抛出异常,则拒绝(并重新排队)它 .如果侦听器(或错误处理程序)抛出
AmqpRejectAndDontRequeueException
,则会覆盖默认行为并丢弃该消息(如果已配置,则路由到DLX / DLQ) - 容器调用basicReject(false)
而不是basicReject(true)
.因此,如果您的验证失败,请抛出
AmqpRejectAndDontRequeueException
. 或者,使用自定义错误处理程序配置侦听器,以将异常转换为AmqpRejectAndDontRequeueException
.这在this answer中有描述 .
如果您真的想要自己负责,请将确认模式设置为
MANUAL
,如果您使用@RabbitListener
,请使用ChannelAwareMessageListener
或this technique .但大多数人只是让容器处理事情(一旦他们了解正在发生的事情) . 通常,使用手动ack是针对特殊用例,例如延迟acks或早期acking .
EDIT
在我指出的答案中有一个错误(现在已经修复);你必须看看
ListenerExecutionFailedException
的原因 . 我刚测试了这个,它按预期工作...结果: