我想使用Apache Camel并行多播到3条路由并聚合(并等待)其中2条路由,而第3条则自行停止(第3条不应阻塞前2条) . 我还需要在“所有”情况下处理这两个,这意味着如果其中一个失败(例如在处理期间抛出异常),它也应该聚合 .

根据我从Camel文档中的理解,只要您没有指定说stopOnException,该行为应该是“默认” . 但是,与异常的交换永远不会进入我的AggregationStrategy . 有点奇怪的是,即使有completionSize(2),聚合后面的处理器也会被执行 .

所以我的问题是:如果没有处理与AggregationStrategy中的异常交换,聚合后的路由如何继续?以及如何正确解决我的案子?请注意,执行.to(“direct:sync”)并非如此,因为此异常可能会从完全超出路由的多播聚合部分的路由中抛出 .

下面是示例代码(ExceptionThrowingProcessor抛出MyException,PropertySetterProcessor并不重要):

CamelContext context = new DefaultCamelContext();
RouteBuilder builder = new RouteBuilder() {
    @Override
    public void configure() throws Exception {
    }
};

builder.onException(MyException.class)
        .process(new PropertySetterProcessor("a", "onException"))
        .handled(true);

builder.from("direct:input")
        .process(new PropertySetterProcessor("a", "before-multicast"))
        .multicast()
        .parallelProcessing()
        .shareUnitOfWork()
        .to("direct:part1", "direct:part2", "direct:part3")

builder.from("direct:part1")
        .process(new PropertySetterProcessor("a", "part1"))
        .to("direct:sync");

builder.from("direct:part2")
        .process(new PropertySetterProcessor("a", "part2"))
        .process(new ExceptionThrowingProcessor("oops")) // throws MyException
        .to("direct:sync");

builder.from("direct:part3")
        .process(new PropertySetterProcessor("a", "part3"));
// don't want this to be aggregated within direct:sync

builder.from("direct:sync")
        .aggregate(new TestAggregationStrategy())
        // strategy.aggregate is called only once (from part1) but not from part2 :(
        .constant(true)
        .completionSize(2)
        .process(new PropertySetterProcessor("a", "after-aggregation"));

context.addRoutes(builder);
context.start();

ProducerTemplate template = context.createProducerTemplate();
template.send("direct:input", new DefaultExchange(context, ExchangePattern.InOut));