我们有以下spring集成流程:
jms消息驱动的通道适配器 - > ... - > pub / sub channel - > 3 richhers subscribed - > aggregator - > ...
每个浓缩器都指定了 task-scheduler
,因此它们并行工作 .
不幸的是,这种方法不能正常工作,因为原始的JMS线程丢失了 .
我期望 jms-message-driver-channel-adapter
和 aggregator
在同一个线程中运行,但 aggregator
(以及后续处理程序)在"last" richher线程中运行 .
我怎样才能做到这一点?我没有看到 spring-int
docs中的任何地方 .
Added after Gary's reply
我决定以更自然的方式实现它:
<int:service-activator method="enrich" input-channel="in" output-channel="out">
<bean class="com.xxx.ParallelEnricher" p:timeoutMs="10000">
<constructor-arg ref="taskExecutor" />
<constructor-arg>
<list>
<bean class="com.xxx.Enricher1" />
<bean class="com.xxx.Enricher2" />
<bean class="com.xxx.Enricher3" />
</list>
</constructor-arg>
</bean>
</int:service-activator>
ParallelEnricher是一个可重用的类,它为每个richr调用“Future taskExecutor.submit(Runnable)”并处理超时 . 可能是我遗漏了一些东西但是在同一个消息上配置并行操作会很好:
<int:service-activator method="enrich" input-channel="in" output-channel="out"
timeout="10000" task-executor="taskExecutor">
<list>
<bean class="com.xxx.Enricher1" />
<bean class="com.xxx.Enricher2" />
<bean class="com.xxx.Enricher3" />
</list>
</int:service-activator>
1 回答
一种解决方案是添加网关中间流;这样,JMS线程将等待来自最终消费者的回复 - 必须返回"something"(只是离开他的
output-channel
并且框架将返回到网关的回复,在那里它可以被丢弃)...只需确保最终消费者(可能是在聚合器之后的某个地方)发送回复(它无关紧要;它将被发送到
nullChannel
而被丢弃) .请注意,默认情况下线程将无限期地等待;如果需要超时并回滚消息,则需要额外的逻辑 .
另一个解决方案是添加第四个订户,一个来自
QueueChannel
的简单服务 . 同样,最终消费者发送消息以触发释放 . 您需要在pub / sub之前添加一个标头,以便为每条消息添加一个新的QueueChannel
标头(<int:header name="myReleaseTriggerChannel" expression="new ...QueueChannel()"/>
;最终消费者将"something"发送到该标头(可能是一个简单的路由器) .此解决方案需要一些用户代码(从队列中接收),但以这种方式处理超时更容易 .