我想创建一个执行以下步骤的路由
-
读取文件
-
将文件拆分为行
-
使用BeanIO解组线路
-
呼叫(昂贵且耗时)处理器
我希望在处理完所有行后完成此路由 .
所以我的路线的基本布局如下:
from("direct:start")
.pollEnrich("file:..")
.split(bodyAs(String.class).tokenize(RECORD_DELIMITER))
.unmarshal(beanIODataFormat)
.process(doTheWork);
现在我想并行处理 doTheWork
处理器以加速处理,因为这个操作是线程安全的,执行顺序无关紧要 .
最简单的方法是通过添加 .parallelProcessing()
来并行化拆分器,但是,这是不可能的,因为BeanIO处理不是线程安全的(参见DateTypeHandlerSupport not thread-safe) . 因此,我希望同步完成解组,然后开始并行处理 .
但是,由于我处于InOut / Request-Reply交换模式,我不能只使用具有多个处理器的SEDA组件,因为分割器将等待当前子消息被完全处理,然后它将下一行发送为子消息 .
相反,我必须将消息作为异步inOnly请求发送,以“鼓励”拆分器发送下一行:
from("direct:start")
.pollEnrich("file:..")
.split(bodyAs(String.class).tokenize(""))
.unmarshal((DataFormatDefinition)null)
.inOnly("seda:child");
from("seda:child?concurrentConsumers=5")
.process(doTheWork);
但是,如果我这样做,第一条路线一旦将所有消息传递到子路由就会完成 .
是否有任何方法可以确保第一个路由仅在处理子路由中的所有消息后完成?或者我如何实现并行处理“实际工作”但是按顺序执行解组?
1 回答
如果我正确理解你,你可以在第一次拆分时使用
AggregationStrategy
来创建List<YourBeanIOFormat>
,然后进行第二次拆分,您可以在其中并行处理每个单独的对象 .现在,我对Java DSL并不熟悉,所以这可能不是100%正确的语法,但希望你能得到这个想法 .