首页 文章

查询Spring消息驱动通道适配器

提问于
浏览
1

我正在使用Spring的消息驱动通道适配器 . 我的组件正在消费来自Tibco主题和发布到RabbitMQ主题的消息

所以消息流如下:Tibco->(订阅者)组件(发布到) - > RabbitMQ

服务激活器如下所示:我们看到有一个输入通道和一个输出通道 . bean storeAndForwardActivator将具有业务逻辑(在方法createIssueOfInterestOratorRecord中)

<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
    ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
    output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />

我还有一个message = driven-channel-adapter . 在调用服务适配器之前将调用此适配器 .

<int-jms:message-driven-channel-adapter
    id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
    container="oratorIssueOfInterestmessageListenerContainer" />

即具体而言,容器(如下所示)将保存要使用的主题名称 - 这是DefaultMessageListenerContainer

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />
</bean>

这个设置非常好 . 但是在某些情况下,我的消费者/组件会收到“流氓”消息 . 即一个空的有效负载或HashMap的消息类型(而不是纯TextMessage) - 当我们得到这个 - 我观察到的是 - 在DefaultMessageListener级别捕获异常(即我没有到我的业务bean,即storeAndForwardActivator) ),因为这个我的组件没有发回ACK - 因为这是一个持久的主题 - 在主题上有一个消息的构建 - 这是不可取的 . 有没有办法让我立即确认消息而不管天气如何在DefaultMessageListener级别捕获异常?

或者我应该在DefaultMessageListener中引入错误处理程序?什么是处理这个的最好方法,任何建议?

问候D.

更新:

我尝试将一个errorHandler添加到org.springframework.jms.listener.DefaultMessageListenerContainer中,如下所示

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />

    <property name="errorHandler" ref="myErrorHandler"/>
</bean>

myErrorHandler是一个bean,如下面的shpwn

<bean id="myErrorHandler"
    class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />

MyErroHandler实现了ErrorHandler

@Service
 public class MyErrorHandler implements ErrorHandler{

private static Log log = LogFactory.getLog(MyErrorHandler.class);

@Override
   public void handleError(Throwable t) {

        if (t instanceof MessageHandlingException) {
            MessageHandlingException exception = (MessageHandlingException) t;
            if (exception != null) {
                org.springframework.messaging.Message<?> message = exception.getFailedMessage();
                Object payloadObject = message.getPayload();
                if (null != payloadObject) {
                    log.info("Payload  is not null, type is: " + payloadObject.getClass());
                }
            }
        } else {
            log.info("Exception is not of type: MessageHandlingException ");
        }
}

}

我注意到的是异常被捕获(当订阅者使用恶意消息时) . 我继续看着这个日志循环

Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException

即,由于事务未提交 - 来自持久主题的相同消息被一次又一次地消耗 . 我的目的是在消费消息后将ACK发回给经纪人(无论天气如何,都会发现异常) .

我明天会尝试错误 Channels .

问候D.

1 回答

  • 1

    error-channel 添加到消息驱动的适配器; ErrorMessage 将包含一个有两个字段的 MessagingException 有效负载; cause (例外)和 failedMessage .

    如果使用默认值 error-channel="errorChannel" ,则会记录异常 .

    如果您想要做更多的事情,您可以配置自己的错误通道并为其添加一些流量 .

    EDIT:

    继续下面的评论......

    payload must not be null 不是堆栈跟踪;这是一条消息 .

    也就是说, payload must not be null 看起来像一个Spring Integration消息;它可能在消息转换期间抛出在消息监听器适配器中,这是在我们到达故障可以转到 error-channel 的点之前;这样的异常将被抛回容器中 .

    打开DEBUG日志记录并查找此日志条目:

    logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");
    

    此外,提供完整的堆栈跟踪 .

    EDIT#2

    所以,我通过在自定义 MessageConverter 中强制转换的有效负载为空来重现您的问题 .

    事务回滚后,容器会调用DMLC错误处理程序,因此无法停止回滚 .

    我们可以为适配器添加一个选项来以不同的方式处理这些错误,但这需要一些工作 .

    与此同时,解决方法是编写自定义 MessageConverter ;类似于this Gist中的那个 .

    然后,您的服务将不得不处理“收到的错误消息”有效负载 .

    然后你提供这样的自定义转换器......

    <jms:message-driven-channel-adapter id="jmsIn"
            destination="requestQueue" acknowledge="transacted"
            message-converter="converter"
            channel="jmsInChannel" />
    
    <beans:bean id="converter" class="foo.MyMessageConverter" />
    

相关问题