首页 文章

如何将JMS重新传递到我的Camel Route中,但仍允许删除邮件

提问于
浏览
1

我有一个Camel Route,它从ActiveMQ JMS队列中读取,进行一些处理,并将结果传递给远程目标 . 如果此进程的通信部分失败,我想无限期地重试(直到目标为"up") . 我可以通过制作路线 transacted 并设置 redeliveryPolicy 来处理这个问题 . 重新传递在Camel中设置(通过 camel.processor.RedeliveryPolicy 重新尝试路由的失败部分)和ActiveMQConnectionFactory(通过 org.apache.activemq.RedeliveryPolicy 重新尝试整个路由) .

我还要求我们应该能够从JMS队列中删除一个条目(我通过一个通过JMX与ActiveMQ对话的应用程序),并且处理应该转移到下一条消息 .

问题是我可以允许删除消息(通过将消费者设置为 cacheLevelName=CACHE_NONE ),或者让连接句柄重试(通过将消费者设置为 cacheLevelName=CACHE_CONSUMER ),但不能同时重置两者 .

这是我目前的设置:

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="${jms.brokerUrl}" />
  <property name="redeliveryPolicy" ref="amqRedeliveryPolicy" />
  <property name="prefetchPolicy">
    <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
      <property name="all" value="0"/>
    </bean>
  </property>
</bean> 

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
  <property name="transactionManager" ref="jmsTransactionManager"/>
  <property name="transacted" value="true"/>
  <property name="concurrentConsumers" value="1"/>
</bean>

<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" >
  <property name="configuration" ref="jmsConfig" />
</bean>

<!-- Redelivery Policy for ActiveMQ (the Broker), so it will perform retries of the entire route -->
<bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
  <property name="initialRedeliveryDelay" value="5000" />
  <property name="redeliveryDelay" value="5000" />
  <property name="maximumRedeliveries" value="-1" />
  <property name="queue" value=">" />
</bean>

<!-- Redelivery Policy for Camel Internal, so it will retry from the part of the exchange that's failed in case of comms issues -->
<bean id="redeliveryProfile" class="org.apache.camel.processor.RedeliveryPolicy">
  <property name="maximumRedeliveries" value="2"/>
  <property name="redeliveryDelay" value="500"/>
</bean>

<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <constructor-arg>
    <bean class="org.springframework.transaction.support.TransactionTemplate">
      <property name="transactionManager" ref="jmsTransactionManager"/>
    </bean>
  </constructor-arg>
</bean>

<camel:camelContext id="testContext">
  <camel:routeBuilder ref="transactedRoute" />
</camel:camelContext>

通过一个非常简单的路线:

@Component
public class TransactedRoute extends SpringRouteBuilder {
  @Override
  public void configure() {
    onException(Exception.class) // we would handle comms exceptions here
      .redeliveryPolicyRef("redeliveryProfile")
      .rollback();

    from("jms:queue:myqueue?cacheLevelName=CACHE_CONSUMER")
      .transacted("PROPAGATION_REQUIRED")
      .bean(Bean1.class)
      .to(DESTINATION); // this could throw a comms exception
  }
}

使用此设置,路由从 jms:queue:myqueue 读取,并且在Comms异常的情况下,Camel在内部重试路径 to 部分两次( redeliveryProfile ) . 然后在5秒延迟后,消息再次通过整个路径发送( amqRedeliveryPolicy ) . 这一直持续到我停止测试 .

但是,如果我从ActiveMQ中删除该消息,它将继续由路由处理,尽管它不再在队列中 .

如果我将消费者改为:

from("jms:queue:myqueue?cacheLevelName=CACHE_NONE")

我现在可以从ActiveMQ中删除该消息并且路由停止处理它......但忽略 amqRedeliveryPolicy ,立即重试消息(没有5秒延迟)并且在6次尝试(AMQ的默认值)之后将其放入死亡信件队列 .

那么,有没有办法通过修改我的配置来实现这两者?

或者我完全错过了沿线的某个地方?

1 回答

  • 0

    如果您需要人工干预以通过JMX删除队列中的特定消息,那听起来有点像糟糕的设计 .

    也许你可以让AMQ在X次尝试失败后将消息移动到DLQ - 永远重新发送也是一个糟糕的设计 . 但是如果6次尝试很少,你可以将默认值增加到更高的值 .

    然后,您可以从DLQ队列检查消息,并尝试了解消息失败的原因,并始终可以安全地通过JMX或其他工具删除或清除DLQ队列 .

    AMQ允许每个队列具有DLQ,您可以将其配置为使用DLQ前缀等,而不是一个公共DLQ队列 .

相关问题