首页 文章

不使用消息驱动通道适配器调用Spring Integration错误通道

提问于
浏览
1

注意:由于我正在处理的项目的依赖性要求,我使用的是Spring Integration 3.0.5.RELEASE .

我已经使用重新传递策略配置了连接工厂(我们使用的是Active MQ 5.7)

<amq:connectionFactory id="cms.jmsConnectionFactory"
    brokerURL="${cms.jms.brokerURL}" p:redeliveryPolicy-ref="cms.jms.redeliveryPolicy"
    p:prefetchPolicy-ref="cms.jms.prefetchPolicy" p:userName="${cms.jms.userName}" p:password="${cms.jms.password}"/>


<amq:redeliveryPolicy id="cms.jms.redeliveryPolicy"
    p:backOffMultiplier="5"
    p:initialRedeliveryDelay="1000"
    p:maximumRedeliveries="6"
    p:redeliveryDelay="1000"
    p:useExponentialBackOff="false"
    p:useCollisionAvoidance="false" />

我有一个服务激活器,它通过消息驱动通道适配器上的消息激活

<int-jms:message-driven-channel-adapter channel="cms.jms.archiveNodeChannel"
    connection-factory="cms.jms.cachedConnectionFactory" destination="cms.jms.archiveNodeDestination" transaction-manager="cms.jms.txManager"
    acknowledge="transacted" concurrent-consumers="1" max-concurrent-consumers="1" cache-level="3"/>

<int:service-activator ref="cms.int.nodeArchiver" method="archiveNode" input-channel="cms.jms.archiveNodeChannel"/>

在某些情况下,服务激活器可以抛出RuntimeException . 我想要的行为是根据重新传递策略的maximumRedeliveries属性重新传递消息6次,然后在Spring Integration层调用错误处理程序但是抛出RuntimeException时实际发生了什么是消息重新传递,直到我杀死应用程序 . 我尝试将错误通道添加到消息驱动通道适配器,但从未激活过 .

This post略显陈旧,但建议在JMS消息驱动的场景中 . DefaultMessageListenerContainer是"caller",只记录RuntimeException(在AbstractMessageListenerContainer基类中) .

可能是我必须扩展DefaultMessageListenerContainer以不同的方式处理RuntimeExceptions,但我想知道是否有人可以指出我更优雅的解决方案或告诉我,我的方法是完全错误的 .

--- JMS Logging(我已经缩进以使其更具可读性) . 日志显示从第一次错误"No file type was found for file 9789814316385.txt"到相同错误的下一次的详细信息(此跟踪无限重复,直到系统关闭 . )确实 redeliveryCounter is always to 0 我不知道为什么会出现这种情况

ERROR  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] No file type was found for file 9789814316385.txt
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Initiating transaction commit
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession 
   {id=ID:simac.home-56110-1437721286921-0:1:6,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Creating new transaction with name 
   [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#2-1] Created JMS transaction on Session 
   [Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:6,started=true}] 
   from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] 
   org.springframework.integration.jms.JmsSendingMessageHandler#4 received message: [Payload=]
[Headers={sequenceNumber=1, sequenceSize=1, jms_timestamp=1437721454735, , jms_redelivered=false, , 
   jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:818, timestamp=1437721454738}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Executing callback on JMS Session: Cached JMS Session:
 ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Sending created message: ActiveMQTextMessage 
   {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, 
   originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, 
   correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, 
   compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, 
   properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454738}, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = }
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] postSend (sent=true) on channel 'cms.jms.archiveNodeChannel', message: [Payload=]
   [Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454735, , jms_redelivered=false, , 
   jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:818, timestamp=1437721454738}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Initiating transaction commit
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession
 {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Creating new transaction with name 
   [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Created JMS transaction on Session [Cached JMS Session: ActiveMQSession
 {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}] 
from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage] 
   from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:simac.home-56110-1437721286921-0:1:2:1, started=true }] of session 
   [Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:2,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] converted JMS Message [ActiveMQTextMessage 
   {commandId = 4953, responseRequired = false, messageId = ID:simac.home-56110-1437721286921-0:1:7:1:819, originalDestination = null, 
   originalTransactionId = null, producerId = ID:simac.home-56110-1437721286921-0:1:7:1, destination = queue://cms.jms.queue.archiveNode 
   transactionId = TX:ID:simac.home-56110-1437721286921-0:1:1644, expiration = 0, timestamp = 1437721454772, 
   arrival = 0, brokerInTime = 1437721454773, brokerOutTime = 1437721454774, correlationId = null, replyTo = null, 
   persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, 
   compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@3ee1137b, 
   dataStructure = null, redeliveryCounter = 0, size = 0, properties = 
   {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454738}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = }]
   to integration Message payload []
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] preSend on channel 'cms.jms.archiveNodeChannel', 
   message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , 
   jms_redelivered=false, , , id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , 
   , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] org.springframework.integration.jms.JmsSendingMessageHandler#4 received message: 
   [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , jms_redelivered=false, , , 
   id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Executing callback on JMS Session: Cached JMS Session: ActiveMQSession
   {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Sending created message: ActiveMQTextMessage 
   {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null,
   destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, 
   correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, 
   targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, 
   redeliveryCounter = 0, size = 0, properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454775}, 
   readOnlyProperties = false, readOnlyBody = false, droppable = false, text = }
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] postSend (sent=true) on channel 'cms.jms.archiveNodeChannel', message: 
   [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454772, , jms_redelivered=false, , , 
   id=20a37cfd-1a9a-1e90-0db0-7fef64b4c02a, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:819, timestamp=1437721454775}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Initiating transaction commit
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession 
   {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Creating new transaction with name 
   [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Created JMS transaction on Session [Cached JMS Session: ActiveMQSession 
   {id=ID:simac.home-56110-1437721286921-0:1:7,started=true}] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:simac.home-56110-1437721286921-0:1,
   clientId=ID:simac.home-56110-1437721286921-1:1,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage] 
   from consumer [Cached JMS MessageConsumer: ActiveMQMessageConsumer { value=ID:simac.home-56110-1437721286921-0:1:2:1, started=true }]
   of session [Cached JMS Session: ActiveMQSession {id=ID:simac.home-56110-1437721286921-0:1:2,started=true}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] converted JMS Message [ActiveMQTextMessage 
   {commandId = 4959, responseRequired = false, messageId = ID:simac.home-56110-1437721286921-0:1:7:1:820, originalDestination = null, 
   originalTransactionId = null, producerId = ID:simac.home-56110-1437721286921-0:1:7:1, destination = queue://cms.jms.queue.archiveNode, 
   transactionId = TX:ID:simac.home-56110-1437721286921-0:1:1646, expiration = 0, timestamp = 1437721454775, arrival = 0, brokerInTime = 1437721454776, 
   brokerOutTime = 1437721454777, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, 
   groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@214372df,
   dataStructure = null, redeliveryCounter = 0, size = 0, properties = {sequenceNumber=1, , sequenceSize=1, , , , , , , timestamp=1437721454775}, 
   readOnlyProperties = true, readOnlyBody = true, droppable = false, text = }] to integration Message payload []
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] preSend on channel 'cms.jms.archiveNodeChannel', 
   message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, , , jms_timestamp=1437721454775, , jms_redelivered=false, , , 
   id=22069a85-30c5-5478-1b6c-10b6264575b1, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:820, timestamp=1437721454778}]
DEBUG  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] ServiceActivator for 
   [org.springframework.integration.handler.MethodInvokingMessageProcessor@755cd5ee] received message: [Payload=][Headers={sequenceNumber=1, sequenceSize=1, 
   , , jms_timestamp=1437721454775, , jms_redelivered=false, , , id=22069a85-30c5-5478-1b6c-10b6264575b1, , , jms_messageId=ID:simac.home-56110-1437721286921-0:1:7:1:820, 
   timestamp=1437721454778}]
ERROR  [org.springframework.jms.listener.DefaultMessageListenerContainer#4-1] No file type was found for file 9789814316385.txt

附加调试器后,我可以看到服务激活器抛出异常的原因没有传播:

问题发生在 org.springframework.integration.dispatcher.UnicastingDispatcher

private boolean doDispatch(Message<?> message) {
    boolean success = false;
    Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
    if (!handlerIterator.hasNext()) {
        throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
    }
    List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
    while (success == false && handlerIterator.hasNext()) {
        MessageHandler handler = handlerIterator.next();
        try {
            handler.handleMessage(message);
            success = true; // we have a winner.
        }
        catch (Exception e) {
            RuntimeException runtimeException = (e instanceof RuntimeException)
                    ? (RuntimeException) e
                    : new MessageDeliveryException(message,
                            "Dispatcher failed to deliver Message.", e);
            if (e instanceof MessagingException &&
                    ((MessagingException) e).getFailedMessage() == null) {
                ((MessagingException) e).setFailedMessage(message);
            }
            exceptions.add(runtimeException);
            this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
        }
    }
    return success;
}

如果成功处理消息,上面显示的doDispatch将循环 Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message); 返回的每个MessageHander .

handlerIterator包含2个MessageHandlers,第一个是我的Service Activator(抛出RuntimeException),第二个是org.springframework.integration.jms.JmsSendingMessageHandler(我不知道为什么会这里)

当Service Activator抛出异常时,它将传递给下面显示的handleExceptions方法:

/**
 * Handles Exceptions that occur while dispatching. If this dispatcher has
 * failover enabled, it will only throw an Exception when the handler list
 * is exhausted. The 'isLast' flag will be <em>true</em> if the
 * Exception occurred during the final iteration of the MessageHandlers.
 * If failover is disabled for this dispatcher, it will re-throw any
 * Exception immediately.
 */
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
    if (isLast || !this.failover) {
        if (allExceptions != null && allExceptions.size() == 1) {
            throw allExceptions.get(0);
        }
        throw new AggregateMessageDeliveryException(message,
                "All attempts to deliver Message to MessageHandlers failed.", allExceptions);
    }
}

由于Service Activator不是最后一个处理程序(isLast = false),因此不会抛出异常 . 第二个JmsSendingMessageHandler处理消息(虽然我不知道为什么)并且消息只是继续被重新传递 . 有谁知道为什么这个JmsSendingMessageHandler也试图处理为Service Activator指定的消息?

1 回答

  • 0

    <publish-subscribe-channel> 适合你!你真的可以添加所有这些组件作为订阅它们将在 order 中调用它们如何在配置中声明它们或根据它们预先配置的 order 属性 .

相关问题