首页 文章

消息不会在抛出异常时重新排队

提问于
浏览
0

我在用

Spring Integration 4.1.2.RELEASE

Spring AMQP 1.4.3.RELEASE

抛出异常时,我期待消息被重新排队 . 我相信我已经满足了所有条件,因为我从这里收集了post

在帖子中,我了解到确保重新排队有三个条件:1) acknowledge-mode 必须是AUTO . 2) requeue-rejected 必须为TRUE . 3)抛出任何异常但是 AmqpRejectAndDontRequeueException .

我相信我在一些测试代码中遇到了这些条件:

Configuration

服务激活器是抛出不从 AmqpRejectAndDontRequeueException 继承的异常的地方

@Autowired
public void setSpringIntegrationConfigHelper (SpringIntegrationHelper springIntegrationConfig) {
    this.springIntegrationConfigHelper = springIntegrationConfig;   
}

@Bean
public String priorityPOCQueueName() {
    return "poc.priority";
}

@Bean
public Queue priorityPOCQueue(RabbitAdmin rabbitAdmin) {
    Queue queue = new Queue(priorityPOCQueueName(), true);
    rabbitAdmin.declareQueue(queue);
    return queue;
}

@Bean
public Binding priorityPOCQueueBinding(RabbitAdmin rabbitAdmin) {
    Binding binding = new Binding(priorityPOCQueueName(),
                                  DestinationType.QUEUE,
                                  "amq.direct",
                                  priorityPOCQueue(rabbitAdmin).getName(),
                                  null);
    rabbitAdmin.declareBinding(binding);
    return binding;
}

@Bean
public AmqpTemplate priorityPOCMessageTemplate(ConnectionFactory amqpConnectionFactory,
                                                @Qualifier("priorityPOCQueueName") String queueName,
                                                @Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
    RabbitTemplate template = new RabbitTemplate(amqpConnectionFactory);
    template.setChannelTransacted(false);
    template.setExchange("amq.direct");
    template.setQueue(queueName);
    template.setRoutingKey(queueName);
    template.setMessageConverter(messageConverter);
    return template;
}


@Autowired
@Qualifier("priorityPOCQueue")
public void setPriorityPOCQueue(Queue priorityPOCQueue) {
    this.priorityPOCQueue = priorityPOCQueue;
}

@Bean(name="exec.priorityPOC")
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
    e.setCorePoolSize(1);
    e.setQueueCapacity(1);
    return e;
}

@Bean(name="poc.priorityChannel")
public MessageChannel pocPriorityChannel() {
    PriorityChannel c = new PriorityChannel(new PriorityComparator());
    c.setComponentName("poc.priorityChannel");
    c.setBeanName("poc.priorityChannel");
    return c;
}

@Bean(name="poc.inboundChannelAdapter") //make this a unique name
public AmqpInboundChannelAdapter amqpInboundChannelAdapter(@Qualifier("exec.priorityPOC") TaskExecutor taskExecutor
        , @Qualifier("poc.errorChannel") MessageChannel pocErrorChannel) {

    int concurrentConsumers = 1;
    AmqpInboundChannelAdapter a =  mimediaSpringIntegrationConfigHelper.createInboundChannelAdapter(taskExecutor
            , pocPriorityChannel(), new Queue[]{priorityPOCQueue},  concurrentConsumers);
    a.setErrorChannel(pocErrorChannel);
    return a;

}

@ServiceActivator(inputChannel="poc.priorityChannel")
public void processUserFileCollectionAudit(@Header(SimulateErrorHeaderPostProcessor.ERROR_SIMULATE_HEADER_KEY) Boolean simulateError, PriorityMessage priorityMessage) throws InterruptedException {
    if (isFirstMessageReceived == false) {
        Thread.sleep(15000); //Cause a bit of a backup so we can see prioritizing in action.
        isFirstMessageReceived = true;
    }
    logger.debug("Received message with priority: " + priorityMessage.getPriority() + ", simulateError: " + simulateError +  ", Current retry count is "
        + priorityMessage.getRetryCount());
    if (simulateError && priorityMessage.getRetryCount() < PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
        logger.debug(" Simulating an error and re-queue'ng. Current retry count is " + priorityMessage.getRetryCount());
        priorityMessage.setRetryCount(priorityMessage.getRetryCount() + 1);
        throw new NonAdequateResourceException();
    } else if (simulateError && priorityMessage.getRetryCount() >= PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
        logger.debug(" Max retry count exceeded");
    }
}

SpringIntegrationHelper

这是自动确认和重新排队被拒绝的地方 .

protected ConnectionFactory connectionFactory;
protected MessageChannel errorChannel;
protected MessageConverter messageConverter;

@Autowired
public void setConnectionFactory (ConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
}

@Autowired
public void setErrorChannel(MessageChannel errorChannel) {
    this.errorChannel = errorChannel;
}

@Autowired
public void setMessageConverter(@Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
    this.messageConverter = messageConverter;
}

public AmqpInboundChannelAdapter createInboundChannelAdapter(TaskExecutor taskExecutor
        ,  MessageChannel outputChannel, Queue[] queues, int concurrentConsumers) {
    SimpleMessageListenerContainer listenerContainer =
            new SimpleMessageListenerContainer(connectionFactory);
    //AUTO is default, but setting it anyhow.
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
    listenerContainer.setAutoStartup(true);
    listenerContainer.setConcurrentConsumers(concurrentConsumers);
    listenerContainer.setMessageConverter(messageConverter);
    listenerContainer.setQueues(queues);
    //listenerContainer.setChannelTransacted(false);
    listenerContainer.setPrefetchCount(100);
    listenerContainer.setTaskExecutor(taskExecutor);
    listenerContainer.setDefaultRequeueRejected(true);



    AmqpInboundChannelAdapter a = new AmqpInboundChannelAdapter(listenerContainer);
    a.setMessageConverter(messageConverter);
    a.setAutoStartup(true);
    //TODO This was stopping my custom error handler. Fix. a.setErrorChannel(errorChannel);
    a.setHeaderMapper(MimediaAmqpHeaderMapper.createPassAllHeaders());
    a.setOutputChannel(outputChannel);
    return a;
}

为什么我的邮件没有重新排队?

1 回答

  • 1

    4)必须在侦听器的线程上抛出异常 .

    只有在收到消息的侦听器容器线程上抛出异常时,它才会起作用 .

    由于您使用 PriorityChannel 作为适配器的输出通道,因此您立即将消息传递给另一个线程,因此侦听器线程上没有异常,并且消息始终是确认的 .

相关问题