首页 文章

带有RabbitMQ的Apache Camel:当 endpoints 配置上的autoAck = false时,临时应答队列中的消息不被激活

提问于
浏览
2

在使用camel-rabbitmq扩展在Camel中配置支持InOut的路由时,我注意到了一个问题 . 当我将主队列配置设置为autoAck = false时,也会为临时应答队列复制相同的配置(它甚至使用相同的预取(5)设置,在RabbitMQ控制台中很容易看到) . 这会导致临时队列中的消息无限期地驻留在那里,直到服务器重新启动 .

Virtual host Name                         Features  State Ready Unacked Total incoming deliver / get ack
/test      amq.gen-Hkdx9mckIfMc6JhDI6d-JA AD Excl   idle    2   5   7   0.00/s 0.00/s   0.00/s
/test      amq.gen-eUU7BRI3Ooo4F8Me7HrPnA AD Excl   idle    2   5   7   0.00/s 0.00/s   0.00/s

即使在日志中我可以清楚地看到回复消息只是因为ack似乎没有被发送到RabbitMQ来清除临时队列中的消息 . 我已经在控制台中检查了两个临时队列都有消费者,所以我希望Camel能够发送ack .

o.a.c.c.r.RabbitMQMessagePublisher  - Sending message to exchange: emailfeedbackExchange with CorrelationId = Camel-ID-VMS-1534332570964-0-11
o.a.c.c.r.r.ReplyManagerSupport  - Received reply message with correlationID [Camel-ID-VMS-1534332570964-0-11]

The question is, how can I prevent this scenario while still keeping my autoAck=false and InOut capable route? 我可能会在这里提到没有错误等,流程按预期工作,电子邮件处理工作正常,唯一的问题是临时队列上的陈旧消息 .

我们的Camel版本是2.20.2这是我们拥有的所有Camel组件的相关Gradle配置:

compile ("org.apache.camel:camel-spring-boot-starter:${camelVersion}")
compile ("org.apache.camel:camel-rabbitmq:${camelVersion}")
compile ("org.apache.camel:camel-amqp:${camelVersion}")

队列和路由配置:

restentrypointroute:
    restEndpoint: /app
    postEndpoint: /email
    outputEmailEndpoint: rabbitmq://vms:5672/emailExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailrouteQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&channelPoolMaxSize=3&prefetchCount=5&prefetchEnabled=true

emailroutebuilder:
    serviceName: emailroutebuilder
    inputEndpoint: rabbitmq://vms:5672/emailExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailrouteQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&channelPoolMaxSize=3&prefetchCount=5&prefetchEnabled=true
    emailProcessor: bean:emailProcessor
    maximumRedeliveries: 5
    redeliveryDelay: 30000

以下是 RestRouteBuilder 实现的相关位:

@Override
public void configure() throws Exception {

restConfiguration().component("restlet").bindingMode(RestBindingMode.json);

    rest(restEndpoint).post(postEndpoint)
      .type(MyRequest.class)
      .route()
      .startupOrder(Integer.MAX_VALUE - 2)
      .process(this::process)
      .choice()
      .when(header(DELIVERYSTATUS_HEADER)
          .isEqualTo(Status.GENERATED)).to(outputEmailEndpoint)
      .when(header(DELIVERYSTATUS_HEADER)
          .isEqualTo(Status.COMPLETED)).to(outputEmailEndpoint, outputArchiveEndpoint).end()
      .endRest();

process()方法将 DELIVERYSTATUS_HEADER 标头添加到Camel交换机并验证有效负载 .

EmailRouteBuilder 看起来像这样:

public void configure() throws Exception {
    super.configure();

    from("direct:" + getServiceName())
            .to(emailProcessor)
            .process(ex -> {
                ex.setOut(ex.getIn());

            });

}

super.configure() 调用配置异常处理和死字,启动顺序,重试计数,最大重新交付等等 . 它将's quite a bit of code there but if you think something in there might be the cause of this issue I' ll发布 . 另外,如果您需要我添加任何其他配置,请告诉我 .

从上面可以清楚地看出为什么我们需要 InOut 路由 autoAck=false ,因为从业务角度来看,丢失的电子邮件很糟糕,而且REST客户端需要根据 EmailProcessor 如何开启来做出响应 . 如何摆脱临时队列中的陈旧消息?

EDIT 实际上路由只有在预取计数耗尽之后才能正常工作,之后它开始抛出异常并且REST客户端正在获取HTTP 500响应 .

org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID-VMSYS119-1534407032085-0-284 not received on destination: amq.gen-eUU7BRI3Ooo4F8Me7HrPnA.

1 回答

  • 1

    根据评论,这被证明是camel-rabbitmq组件中的一个错误,现在修复已应用于主分支 .

    Jira门票在这里:https://issues.apache.org/jira/browse/CAMEL-12746

    该修复程序将在版本2.21.3,2.22.1,2.23.0及更高版本中提供 .

    Edit:

    包括答案中的代码更改 .

    TemporaryQueueReplyManager第139行 - 始终以 true 的自动确认模式启动temprary队列的使用者 .

    改变这个:

    private void start() throws IOException {         
        tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);     
    }
    

    对此:

    private void start() throws IOException {
         tag = channel.basicConsume(getReplyTo(), true, this);
     }
    

相关问题