我有一个Camel Route,它从ActiveMQ JMS队列中读取,进行一些处理,并将结果传递给远程目标 . 如果此进程的通信部分失败,我想无限期地重试(直到目标为"up") . 我可以通过制作路线 transacted
并设置 redeliveryPolicy
来处理这个问题 . 重新传递在Camel中设置(通过 camel.processor.RedeliveryPolicy
重新尝试路由的失败部分)和ActiveMQConnectionFactory(通过 org.apache.activemq.RedeliveryPolicy
重新尝试整个路由) .
我还要求我们应该能够从JMS队列中删除一个条目(我通过一个通过JMX与ActiveMQ对话的应用程序),并且处理应该转移到下一条消息 .
问题是我可以允许删除消息(通过将消费者设置为 cacheLevelName=CACHE_NONE
),或者让连接句柄重试(通过将消费者设置为 cacheLevelName=CACHE_CONSUMER
),但不能同时重置两者 .
这是我目前的设置:
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerUrl}" />
<property name="redeliveryPolicy" ref="amqRedeliveryPolicy" />
<property name="prefetchPolicy">
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="all" value="0"/>
</bean>
</property>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="transacted" value="true"/>
<property name="concurrentConsumers" value="1"/>
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" >
<property name="configuration" ref="jmsConfig" />
</bean>
<!-- Redelivery Policy for ActiveMQ (the Broker), so it will perform retries of the entire route -->
<bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="5000" />
<property name="redeliveryDelay" value="5000" />
<property name="maximumRedeliveries" value="-1" />
<property name="queue" value=">" />
</bean>
<!-- Redelivery Policy for Camel Internal, so it will retry from the part of the exchange that's failed in case of comms issues -->
<bean id="redeliveryProfile" class="org.apache.camel.processor.RedeliveryPolicy">
<property name="maximumRedeliveries" value="2"/>
<property name="redeliveryDelay" value="500"/>
</bean>
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<constructor-arg>
<bean class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
</constructor-arg>
</bean>
<camel:camelContext id="testContext">
<camel:routeBuilder ref="transactedRoute" />
</camel:camelContext>
通过一个非常简单的路线:
@Component
public class TransactedRoute extends SpringRouteBuilder {
@Override
public void configure() {
onException(Exception.class) // we would handle comms exceptions here
.redeliveryPolicyRef("redeliveryProfile")
.rollback();
from("jms:queue:myqueue?cacheLevelName=CACHE_CONSUMER")
.transacted("PROPAGATION_REQUIRED")
.bean(Bean1.class)
.to(DESTINATION); // this could throw a comms exception
}
}
使用此设置,路由从 jms:queue:myqueue
读取,并且在Comms异常的情况下,Camel在内部重试路径 to
部分两次( redeliveryProfile
) . 然后在5秒延迟后,消息再次通过整个路径发送( amqRedeliveryPolicy
) . 这一直持续到我停止测试 .
但是,如果我从ActiveMQ中删除该消息,它将继续由路由处理,尽管它不再在队列中 .
如果我将消费者改为:
from("jms:queue:myqueue?cacheLevelName=CACHE_NONE")
我现在可以从ActiveMQ中删除该消息并且路由停止处理它......但忽略 amqRedeliveryPolicy
,立即重试消息(没有5秒延迟)并且在6次尝试(AMQ的默认值)之后将其放入死亡信件队列 .
那么,有没有办法通过修改我的配置来实现这两者?
或者我完全错过了沿线的某个地方?
1 回答
如果您需要人工干预以通过JMX删除队列中的特定消息,那听起来有点像糟糕的设计 .
也许你可以让AMQ在X次尝试失败后将消息移动到DLQ - 永远重新发送也是一个糟糕的设计 . 但是如果6次尝试很少,你可以将默认值增加到更高的值 .
然后,您可以从DLQ队列检查消息,并尝试了解消息失败的原因,并始终可以安全地通过JMX或其他工具删除或清除DLQ队列 .
AMQ允许每个队列具有DLQ,您可以将其配置为使用DLQ前缀等,而不是一个公共DLQ队列 .