首页 文章

spring 集成的并行丰富 - 初始线程丢失

提问于
浏览
0

我们有以下spring集成流程:

jms消息驱动的通道适配器 - > ... - > pub / sub channel - > 3 richhers subscribed - > aggregator - > ...

每个浓缩器都指定了 task-scheduler ,因此它们并行工作 .

不幸的是,这种方法不能正常工作,因为原始的JMS线程丢失了 .

我期望 jms-message-driver-channel-adapteraggregator 在同一个线程中运行,但 aggregator (以及后续处理程序)在"last" richher线程中运行 .

我怎样才能做到这一点?我没有看到 spring-int docs中的任何地方 .

Added after Gary's reply

我决定以更自然的方式实现它:

<int:service-activator method="enrich" input-channel="in" output-channel="out">
  <bean class="com.xxx.ParallelEnricher" p:timeoutMs="10000">
    <constructor-arg ref="taskExecutor" />
    <constructor-arg>
      <list>
        <bean class="com.xxx.Enricher1" />
        <bean class="com.xxx.Enricher2" />
        <bean class="com.xxx.Enricher3" />
      </list>
    </constructor-arg>
  </bean>
</int:service-activator>

ParallelEnricher是一个可重用的类,它为每个richr调用“Future taskExecutor.submit(Runnable)”并处理超时 . 可能是我遗漏了一些东西但是在同一个消息上配置并行操作会很好:

<int:service-activator method="enrich" input-channel="in" output-channel="out"
                       timeout="10000" task-executor="taskExecutor">
  <list>
    <bean class="com.xxx.Enricher1" />
    <bean class="com.xxx.Enricher2" />
    <bean class="com.xxx.Enricher3" />
  </list>
</int:service-activator>

1 回答

  • 1

    一种解决方案是添加网关中间流;这样,JMS线程将等待来自最终消费者的回复 - 必须返回"something"(只是离开他的 output-channel 并且框架将返回到网关的回复,在那里它可以被丢弃)...

    <int:service-activator input-channel="fromJMS" output-channel="nullChannel" 
        ref="gw" />
    
    <int:gateway default-request-channel="myPubSub" />
    

    只需确保最终消费者(可能是在聚合器之后的某个地方)发送回复(它无关紧要;它将被发送到 nullChannel 而被丢弃) .

    请注意,默认情况下线程将无限期地等待;如果需要超时并回滚消息,则需要额外的逻辑 .

    另一个解决方案是添加第四个订户,一个来自 QueueChannel 的简单服务 . 同样,最终消费者发送消息以触发释放 . 您需要在pub / sub之前添加一个标头,以便为每条消息添加一个新的 QueueChannel 标头( <int:header name="myReleaseTriggerChannel" expression="new ...QueueChannel()"/> ;最终消费者将"something"发送到该标头(可能是一个简单的路由器) .

    此解决方案需要一些用户代码(从队列中接收),但以这种方式处理超时更容易 .

相关问题