我有2个应用程序使用RabbitMQ交换数据 . 我使用Spring AMQP实现了这一点 . 我有一个场景,一旦消费者消费消息可能会在处理时遇到异常 .
如果有任何异常,我打算登录数据库 . 一旦消息到达消费者,我必须显式地从队列中删除消息,无论是成功处理还是遇到错误 .
如何强制从队列中删除消息,否则如果我的应用程序无法处理它将会在那里?
下面是我的Listener代码
@RabbitListener(containerFactory="rabbitListenerContainerFactory",queues=Constants.JOB_QUEUE)
public void handleMessage(JobListenerDTO jobListenerDTO) {
//System.out.println("Received summary: " + jobListenerDTO.getProcessXML());
//amqpAdmin.purgeQueue(Constants.JOB_QUEUE, true);
try{
Map<String, Object> variables = new HashMap<String, Object>();
variables.put("initiator", "cmy5kor");
Deployment deploy = repositoryService.createDeployment().addString(jobListenerDTO.getProcessId()+".bpmn20.xml",jobListenerDTO.getProcessXML()).deploy();
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(jobListenerDTO.getProcessId(), variables);
System.out.println("Process Instance is:::::::::::::"+processInstance);
}catch(Exception e){
e.printStackTrace();
}
配置代码
@Configuration
@EnableRabbit
public class RabbitMQJobConfiguration extends AbstractBipRabbitConfiguration {
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setQueue(Constants.JOB_QUEUE);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public Queue jobQueue() {
return new Queue(Constants.JOB_QUEUE);
}
@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("com.bosch.diff.approach.TaskMessage", JobListenerDTO.class);
classMapper.setIdClassMapping(idClassMapping);
messageConverter.setClassMapper(classMapper);
factory.setMessageConverter(messageConverter);
factory.setReceiveTimeout(10L);
return factory;
}
}
2 回答
我不知道关于rmq的spring api或配置,但是这个
正是在设置自动确认标志时发生的事情 . 通过这种方式,消息一旦被消耗就被确认 - 因此从队列中消失 .
只要您的侦听器捕获异常,就会从队列中删除该消息 .
如果您的侦听器抛出异常,则默认情况下将重新排队;可以通过抛出
AmqpRejectAndDontRequeueException
或设置defaultRequeueRejected
属性来修改该行为 - 请参阅the documentation for details .