首页 文章

Apache camel(这里使用EIP?)使用重复的头数据聚合/丰富所有行

提问于
浏览
1

我有一个CSV我正在处理格式如下:

01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5

01指定 Headers 行,02指定正文行 .

我需要获取 Headers 数据并将其添加到正文消息,所以我最终发送这样的消息:

H2,H3,B2,B3,B4,B5
H2,H3,B2,B3,B4,B5
H2,H3,B2,B3,B4,B5

我试图聚合,但在这种情况下似乎不是正确的EIP,因为我只是反复地将相同的消息组合而不是将多个消息组合成一个...在基本级别我需要访问 Headers 数据,以处理身体(事实上,它只是一个领域) . 我只是不知道如何设置变量,因为 Headers 和属性在每次交换时都被清除 . 有小费吗?提前致谢 . 让我知道是否有助于看到骆驼路线 .

这是骆驼路线可能会有所帮助:

from("direct:inventory")
    .split(body().tokenize("\n")).streaming()
    .throttle(100)
    .choice()
      .when(property("CamelSplitComplete").isEqualTo(true))
        .log("Processed ${property.CamelSplitSize} updates")
      .end()
    .unmarshal(csv)
          .log("${body}")
          .aggregate(header("CamelFileLastModified"), new InventoryAggregationStrategy())
          .completionPredicate(header("aggregationComplete").isEqualTo(true))
          .to("freemarker://templates/inventory.ftl")
          .unmarshal().string("UTF-8")
          .unmarshal().json(JsonLibrary.Jackson)
          .convertBodyTo(JsonObject.class)
          .to("endpoint");

Here is the spec for the data

1 回答

  • 1

    您可以随时使用简单的方法并使用bean .

    public class CamelHeadersAndRows {
        public static class HeaderBean {
            String header = null;
            public void setHeader(String body) {
                header = body.substring("01,".length());
            }
            public String useHeader(String body) {
                return header + "," + body.substring("02,".length());
            }
        }
    
        public static void main(String[] args) throws Exception {
            final HeaderBean headerBean = new HeaderBean();
    
            Main camelMain = new Main();
            camelMain.addRouteBuilder(new RouteBuilder() {
                @Override
                public void configure() throws Exception {
                    from("timer:foo?period=1s&repeatCount=1")
                            .setBody(constant(
                                    "01,H2,H3\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "01,H2,H3\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "01,H2,H3\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5"
                            ))
                            .to("direct:inventory");
    
                    from("direct:inventory")
                            .split(body().tokenize("\n")).streaming()
                            .choice()
                                .when().simple("${body} regex '^01.*'")
                                    .bean(headerBean, "setHeader")
                                    .stop()
                                .otherwise()
                                    .bean(headerBean, "useHeader")
                            .end()
                            .log("message: ${body}")
                    ;
                }
            });
            camelMain.run();
        }
    
    }
    

相关问题