首页 文章

如何拆分消息对其中一个进行额外处理并将其聚合回来

提问于
浏览
1

我需要根据一些配置文件配置一些camel路由 .

所有已配置的路由都需要将消息拆分为一个或两个子消息,然后对第一个消息执行一些JMS集成工作,然后将JMS应答与可选的第二个消息聚合在一起 . 在简化的图片中,它将如下所示:

message -- > split  --> message 1 --> JMS request/reply --> aggregate --> more processing
                   \--> message 2                      /

聚合将在完成大小上完成,如果它将是1或2,我可以提前知道,具体取决于路由元数据 . 当存在第二条消息时,在与JMS回复合并之前不需要其他处理 .

简而言之,我需要进行拆分,然后进行路由,然后进行聚合,这是一种非常常见的模式 . 唯一的特点是,如果第二个拆分消息存在,我不需要在聚合它之前对其做任何事情 .

在java DSL中,它看起来像这样:

from("direct:abc")
    // The splitter below  will set the JmsIntegration flag
    .split().method(MySplitter.class, "split")
    .choice()
        .when(header("JmsIntegration"))
            .inOut("jms:someQueue"))
        .otherwise()
            // what should I have on here?
            .to(???)
    .end()
    .aggregate(...)to(...);

所以我的问题是:

  • 我应该在其他分支上放什么?

实际上我需要的是 if :如果拆分消息需要JMS转到JMS然后转移到聚合器,如果它不直接转到聚合器 . 我正在考虑创建一个虚拟处理器,它实际上什么都不做,但这对我来说似乎是一个天真的方法 .

  • 我走错了路吗?如果是这样的话会有什么选择

最初我正在考虑增加消息,但我不想将原始消息发送给JMS

  • 我还考虑将我的聚合策略放在我的分配器中,但我再次无法将它们放在一起 .

3 回答

  • 0

    Splitter会自动将拆分交换聚合在一起 . 但是,默认(自2.3)聚合策略是返回原始交换 . 您可以通过直接在Splitter上指定默认策略来轻松覆盖默认策略 . 此外,如果您的选择没有替代流程,那么使用Filter会更容易 . 例:

    from("direct:abc")
        .split().method(MySplitter.class, "split").aggregationStrategy(new MyStrategy())
            .filter(header("JmsIntegration"))
                .inOut("jms:someQueue"))
            .end()
        .end()
        .to(...);
    

    您仍然需要实现 MyStrategy 来组合这两条消息 .

  • -1

    基于您的帖子,看起来您正在尝试使用原始消息返回您的浓缩合并,但您希望将自定义消息发送到jms endpoints . 我建议将您的原始邮件存储在bean或缓存中或类似的东西中,利用所有与camel的转换,然后让您的聚合策略利用您的存储返回所需的格式 .

    from("direct:abc")
        .split().method(MySplitter.class, "split")
            .choice()
                .when(header("JmsIntegration"))
                    .beanRef("MyStorageBean", "storeOriginal")
                    .convertBodyTo(MyJmsFormat.class)
                    //This aggregation strategy could have a reference 
                    //to your storage bean and retrieve the instance
                    .enrich("jms:someQueue", myCustomAggreationStrategyInstance)
                .otherwise()
             .end()
        .aggregate(...)
        .to("direct:continueProcessing");
    

    选项#2:基于你的评论说你需要“直接:abc endpoints 收到的原始消息可以简化很多 . 在这个例子中我们可以使用camel现有的原始消息存储来检索传递给直接的消息:abc . 如果您在拆分后的消息有一个JmsIntegration标头,我们会将主体转换为所需的jms调用格式,利用enrich语句进行jms调用,并使用自定义聚合器,使您可以访问用于调用的消息jms endpoints ,返回的消息,以及原始消息direct:abc has . 如果您的流没有JmsIntegration头,则消息将转到路由中的OTHER语句,该语句在结束选择语句之前不进行其他处理然后将吐出的消息与您需要的任何自定义策略一起聚合在一起 .

    from("direct:abc")
        .split().method(MySplitter.class, "split")
            .choice()
                .when(header("JmsIntegration"))
                    .convertBodyTo(MyJmsFormat.class)
                    //See aggregationStrategy sample below
                    .enrich("jms:someQueue", myAggStrat)
                .otherwise()
                    //Non JmsIntegration header messages come here, 
                    //but receive no work and are passed on.
             .end()
        .aggregate(...)
        .to("direct:continueProcessing");
    
    //Your Custom Aggregator
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        //This logic will retrieve the original message passed into direct:abc
        Message originalMessage =(Message)exchange.getUnitOfWork().getOriginalInMessage();
        //TODO logic for manipulating your exchanges and returning the desired result
    }
    
  • 1

    你说你考虑过使用Enricher,但是你不想发送原始信息 . 您可以使用pre-JMS路由来巧妙地解决此问题:

    from("direct:abc")
        .enrich("direct:sendToJms", new MyAggregation());
        .to("direct:continue");
    
    from("direct:sendToJms")
        // do marshalling or conversion here as necessary
        .convertBodyTo(MyJmsRequest.class)
        .to("jms:someQueue");
    
    public class MyAggregation implements AggregationStrategy {
    
        public Exchange aggregate(Exchange original, Exchange resource) {
            MyBody originalBody = original.getIn().getBody(MyBody.class);
            MyJmsResponse resourceResponse = resource.getIn().getBody(MyJmsResponse.class);
            Object mergeResult = ... // combine original body and resource response
            original.getIn().setBody(mergeResult);
            return original;
        } 
    }
    

相关问题