首页 文章

Camel“组合消息处理器”如何检测“onComplete”

提问于
浏览
0

在我的骆驼路线中,我必须使用Composed Message Processor . 这意味着我首先必须拆分交换(从db的大结果集),然后我必须聚合它 .

我通过使用解耦的seda队列看到的问题是如何检测工作已完成(路由是手动触发的,并且将运行几个小时) . 我必须回调工作完成的基础设施 .

使用“仅分离器”变体不是一个好选择,因为我会松开聚合传播器 - 相关逻辑(相关的交换必须按顺序) . 但最大的问题是失去了completionSize . 我不能聚合成多个组 .

以下是该模式的示例:

from("direct:start")
    .split().body()
    .end()
    .to("seda:aggregate");

// collect and re-assemble the validated OrderItems into an order again
from("seda:aggregate")
    .process(setHeaders)
    .aggregate(new MyOrderAggregationStrategy()).header("orderId").completionSize(header("count"))
    .parallelProcessing()
    .process(doTheWork)
    .to("mock:result");

这里仅为拆分器的示例:

from("direct:start")
    .routePolicy( finishNotifier ) //implements onExchangeDone
    .split(body(), new MyOrderStrategy())
    .parallelProcessing()
    .process(doTheWork)
    .to("bean:MyOrderService?method=buildCombinedResponse")

作为旁注:只有链接拆分和聚合才能在禁用parallelProcessing时起作用 . 启用parallelProcessing后,在finishNotifier.onExchangeDone之后调用“doTheWork” . 因为如果工作结束我调用了context.stop(),在camel停止后调用“process”方法!

2 回答

  • 0

    很抱歉回答我自己的问题但我找到了一个非常简单的解决方案 . 我遇到这种模式的原因是骆驼用户组的一个建议 . 根本问题是聚合器模式中的broken(imho) parallelProcessing() . 我的原始路由只是链分路器和聚合器,但需要在聚合器的输出上进行多线程处理 .

    解决方案很简单:只需使用“threads()”而不是parallelProcessing() .

  • 0

    您可以实施自己的自定义聚合策略,该策略依赖于计算接收的交换数量与进程所具有的记录总数 . 您还可以查看动作书第8-10章中的驼峰,以获得许多基本实现示例的分离器和聚合模式的非常好的参考示例 . 如果您使用骆驼,我强烈建议至少选择一份PDF副本,因为它可以很容易地覆盖您的典型企业用例 .

    此博客文章也非常适合解决使用案例正确设置聚合的一些问题:

    http://tmielke.blogspot.com/2009/01/using-camel-aggregator-correctly.html

相关问题