我正在使用Spring的消息驱动通道适配器 . 我的组件正在消费来自Tibco主题和发布到RabbitMQ主题的消息
所以消息流如下:Tibco->(订阅者)组件(发布到) - > RabbitMQ
服务激活器如下所示:我们看到有一个输入通道和一个输出通道 . bean storeAndForwardActivator将具有业务逻辑(在方法createIssueOfInterestOratorRecord中)
<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />
我还有一个message = driven-channel-adapter . 在调用服务适配器之前将调用此适配器 .
<int-jms:message-driven-channel-adapter
id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
container="oratorIssueOfInterestmessageListenerContainer" />
即具体而言,容器(如下所示)将保存要使用的主题名称 - 这是DefaultMessageListenerContainer
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
</bean>
这个设置非常好 . 但是在某些情况下,我的消费者/组件会收到“流氓”消息 . 即一个空的有效负载或HashMap的消息类型(而不是纯TextMessage) - 当我们得到这个 - 我观察到的是 - 在DefaultMessageListener级别捕获异常(即我没有到我的业务bean,即storeAndForwardActivator) ),因为这个我的组件没有发回ACK - 因为这是一个持久的主题 - 在主题上有一个消息的构建 - 这是不可取的 . 有没有办法让我立即确认消息而不管天气如何在DefaultMessageListener级别捕获异常?
或者我应该在DefaultMessageListener中引入错误处理程序?什么是处理这个的最好方法,任何建议?
问候D.
更新:
我尝试将一个errorHandler添加到org.springframework.jms.listener.DefaultMessageListenerContainer中,如下所示
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
<property name="errorHandler" ref="myErrorHandler"/>
</bean>
myErrorHandler是一个bean,如下面的shpwn
<bean id="myErrorHandler"
class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />
MyErroHandler实现了ErrorHandler
@Service
public class MyErrorHandler implements ErrorHandler{
private static Log log = LogFactory.getLog(MyErrorHandler.class);
@Override
public void handleError(Throwable t) {
if (t instanceof MessageHandlingException) {
MessageHandlingException exception = (MessageHandlingException) t;
if (exception != null) {
org.springframework.messaging.Message<?> message = exception.getFailedMessage();
Object payloadObject = message.getPayload();
if (null != payloadObject) {
log.info("Payload is not null, type is: " + payloadObject.getClass());
}
}
} else {
log.info("Exception is not of type: MessageHandlingException ");
}
}
}
我注意到的是异常被捕获(当订阅者使用恶意消息时) . 我继续看着这个日志循环
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
即,由于事务未提交 - 来自持久主题的相同消息被一次又一次地消耗 . 我的目的是在消费消息后将ACK发回给经纪人(无论天气如何,都会发现异常) .
我明天会尝试错误 Channels .
问候D.
1 回答
将
error-channel
添加到消息驱动的适配器;ErrorMessage
将包含一个有两个字段的MessagingException
有效负载;cause
(例外)和failedMessage
.如果使用默认值
error-channel="errorChannel"
,则会记录异常 .如果您想要做更多的事情,您可以配置自己的错误通道并为其添加一些流量 .
EDIT:
继续下面的评论......
payload must not be null
不是堆栈跟踪;这是一条消息 .也就是说,
payload must not be null
看起来像一个Spring Integration消息;它可能在消息转换期间抛出在消息监听器适配器中,这是在我们到达故障可以转到error-channel
的点之前;这样的异常将被抛回容器中 .打开DEBUG日志记录并查找此日志条目:
此外,提供完整的堆栈跟踪 .
EDIT#2
所以,我通过在自定义
MessageConverter
中强制转换的有效负载为空来重现您的问题 .事务回滚后,容器会调用DMLC错误处理程序,因此无法停止回滚 .
我们可以为适配器添加一个选项来以不同的方式处理这些错误,但这需要一些工作 .
与此同时,解决方法是编写自定义
MessageConverter
;类似于this Gist中的那个 .然后,您的服务将不得不处理“收到的错误消息”有效负载 .
然后你提供这样的自定义转换器......