首页 文章

寻找并发交换作为apache camel splitter的输入

提问于
浏览
1

我正在实现一个从数据库中读取行的路由,使用拆分器拆分它们,并行处理它们并聚合和更新数据库 . 当分路器路径只有一个输入时,一切都按预期工作 . 示例代码 -

<route>
        <from uri="direct:splitter"/>
        <log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}" />
        <split strategyRef="aggregatorStrategy" executorServiceRef="myPool">
            <simple>${body}</simple>
            <log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}, split index - $simple{property.CamelSplitIndex}" />
            <to uri="bean:gisResponseProcessor" />
        </split>
    </route>

当我向direct发送3条消息时:splitter(每条消息需要几分钟才能完成处理)并让它们全部并行处理 . 当我尝试这个时,立即打印所有3个输入的分配器外的第一个日志消息 . 但是,来自拆分器内部的日志消息表明3个交换中的每一个都是一个接一个地拆分 . 每个子消息都使用线程池 . 有没有办法让拆分器并行拆分3个输入交换?

2 回答

  • 0

    是的,使用 seda 路线或 vm 路线 . 这些都做同样的事情,但用途略有不同 . 我建议你看看我对这些差异的回答:camel splitter parallel processing .

    您遇到的问题是您希望将消息并行发送到拆分器 . 因此,消息1,2和3被并行处理 . 这可以通过使用 seda 组件来实现 . seda 组件是异步的,允许您并行处理路由上的消息 .

    试试这条路线:

    <route>
        <from uri="direct:splitter"/>
        <log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}" />
         <to uri="seda:sedaSplitter"/>
    </route>
    <route id="sedaSplitter>
        <from uri="seda:sedaSplitter?multipleConsumers=true&amp;concurrentConsumers=16"/>
    
          <split strategyRef="aggregatorStrategy" executorServiceRef="myPool">
              <simple>${body}</simple>
              <log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}, split index - $simple{property.CamelSplitIndex}" />
              <to uri="bean:gisResponseProcessor" />
          </split>
    </route>
    

    请注意,消息将处理到 seda 组件路径,其中 MESSAGE 以并行方式处理 . 您可以通过在拆分器中包含 parallelProcessing="true" 来使此更加平行,以便并行处理拆分中的各个项目 .

    如果您需要更多信息,请大声喊叫 .

  • 0

    检查您的线程池中至少有6个线程可用,名为 myPool

相关问题