首页 文章

Apache Camel迭代List

提问于
浏览
1

我是Apache Camel的新手,我在理解如何实现简单的集成任务时遇到了问题:

  • REST服务通过Apache Camel路由调用Spring Bean

  • Spring Bean返回类的集合(ArrayList)

我需要迭代集合中的每个项目,并通过自定义转换器将其转换为另一种类型 .

我似乎应该使用Split和Aggregator,但是如何约束Aggregator来使用原始列表中的所有项目,而不是更多,也不是更少 . 另外,如何将一个项目转换为另一个项目?我应该使用类型转换器吗?

有人可以给我一个简单的例子吗?

更新1

对不起,我不得不撤消对提供的示例的接受,因为它实际上没有回答我的问题 . 以下是用例限定:我需要从 to("bean:someBean") 调用中拆分和转换方法返回值,而不是从某个 endpoints 拆分和转换输入值 .

用例是

  • 呼叫一些 endpoints ;例如关于休息服务的GET请求,在我的情况下: from("endpoint")

  • 调用bean并获取它的返回值;像 Listto("bean:someBean")

  • 将返回值转换为另一个 List

  • List 转换为消费者

更新2

所以,我可以确认使用 end() 方法并不能解决我的问题 .

这是代码:

rest("some/service").get().produces("application/json").to("bean:someBean?method=getListOfObjects").route().split(body(), (oldExchange, newExchange) -> {
                List<ObjectToConvert> oldList = oldExchange.getIn(List.class);
                List<NewObject> convertedList = taskList.stream().map(ObjectToConvert::new).collect(Collectors.toList());
                newExchange.getOut().setBody(convertedList);

                return newExchange;
            }).end();

使用这种路由,我在应用程序服务器上得到以下异常:

19:30:21,126 ERROR [org.jboss.as.controller.management-operation] (XNIO-1 task-5) JBAS014613: Operation ("full-replace-deployment") failed - address: (undefined) - failure description: {"JBAS014671: Failed services" => {"jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel" => "org.jboss.msc.service.StartException in service jboss.undertow.deployment.default-server.default-host./CamundaLearningCamel: Failed to start service
    Caused by: java.lang.RuntimeException: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[{body} -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[{body} -> []]
    Caused by: org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[{body} -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[{body} -> []]
    Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route route2 at: >>> Split[{body} -> []] <<< in route: Route(route2)[[From[rest:get:task/all?produces=application%2... because of Definition has no children on Split[{body} -> []]
    Caused by: java.lang.IllegalArgumentException: Definition has no children on Split[{body} -> []]"}}

3 回答

  • 1

    以下是拆分聚合并转换列表消息的完整示例 .

    • camel splitter提供了一个内置聚合器,它聚合原始交换中的所有拆分消息 . 因此,拆分器只聚合"direct:start"中发送的每个列表(交换)的消息 . 您必须提供自定义聚合策略,因为默认的策略将使用原始交换,在我的示例中为InOrder . 聚合策略是拆分定义的第二个参数 .

    • 类型转换器使我们有机会在DSL中使用convertBodyTo . 您还可以使用bean,处理器或自定义聚合策略中的所有转换来实现这一点 .

    package org.mydemocamel.app;
    import org.apache.camel.*;
    import org.apache.camel.builder.RouteBuilder;
    import org.apache.camel.component.mock.MockEndpoint;
    import org.apache.camel.processor.aggregate.AggregationStrategy;
    import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
    import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
    import org.apache.camel.support.TypeConverterSupport;
    import org.apache.camel.test.junit4.CamelTestSupport;
    import org.apache.camel.util.toolbox.FlexibleAggregationStrategy;
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.List;
    
    
    public class CamelSplitAggregateConvertTest extends CamelTestSupport {
    
        @Produce
        private ProducerTemplate template;
    
        @EndpointInject(uri = "mock:out")
        private MockEndpoint mock;
    
        @Test
        public void testSplitAggregateConvertOrder(){
            InOrder inOrder1 = new InOrder();
            inOrder1.setId("1");
    
            InOrder inOrder2 = new InOrder();
            inOrder2.setId("2");
    
            List<InOrder> inOrderList = new ArrayList<InOrder>();
            inOrderList.add(inOrder1);
            inOrderList.add(inOrder2);
    
            template.sendBody("direct:start", inOrderList);
    
            mock.expectedMessageCount(1);
            Exchange outList = mock.getReceivedExchanges().get(0);
            List<OutOrder> outOrderList = outList.getIn().getBody(List.class);
    
            assertEquals(1, outOrderList.get(0).getId());
            assertEquals(2, outOrderList.get(1).getId());
    
    
        }
    
        @Override
        protected RouteBuilder createRouteBuilder() throws Exception {
            return new RouteBuilder() {
                @Override
                public void configure() throws Exception {
    
                    context.getTypeConverterRegistry().addTypeConverter(OutOrder.class, InOrder.class, new MyOrderTypeConverter());
    
                    from("direct:start")
                    .split(body(), new AggregationStrategy() {
                        @Override
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                            if (oldExchange == null) {
                                List<OutOrder> orders = new ArrayList<OutOrder>();
                                OutOrder newOrder = newExchange.getIn().getBody(OutOrder.class);
                                orders.add(newOrder);
                                newExchange.getIn().setBody(orders);
                                return newExchange;
                            }
                            List<OutOrder> orders = oldExchange.getIn().getBody(List.class);
                            OutOrder newOutOrder = newExchange.getIn().getBody(OutOrder.class);
                            orders.add(newOutOrder);
                            oldExchange.getIn().setBody(orders);
                            return oldExchange;
                        }
                    })
                    .convertBodyTo(OutOrder.class)
                    .end()  //splitter ends here and the exchange body  is now List<OutOrder> 
                    .to("mock:out");
    
                }
            };
        }
    
        private static class MyOrderTypeConverter extends TypeConverterSupport {
    
            @SuppressWarnings("unchecked")
            public <T> T convertTo(Class<T> type, Exchange exchange, Object value) {
                // converter from inorder to outorder bean
                OutOrder order = new OutOrder();
                order.setId(Integer.parseInt(((InOrder) value).getId()));
                return (T) order;
            }
        }
    
        private static class OutOrder {
            private int id;
    
            public void setId(int id) {
                this.id = id;
            }
    
            public int getId() {
                return id;
            }
        }
    
        private static class InOrder {
            private String id;
    
            public void setId(String id) {
                this.id = id;
            }
    
            public String getId() {
                return id;
            }
        }
    }
    
  • 3

    你是对的是拆分器和聚合器,http://camel.apache.org/splitter.html来自http://camel.apache.org/splitter.html显示你需要的东西 .

    在拆分器之后你需要"converted"对象列表吗?如果是,http://camel.apache.org/aggregator2.html中的"Using a List in AggregationStrategy"点看起来正确满足您的需求 .

    亲切的问候,土工

  • 0

    为了将来参考,您可以使用Java DSL的 loop 构造迭代列表的另一种方法 . 这是一个例子:

    from("direct:myDirect")
    .loop(header("LIST_LENGTH")) // You will set this header in the processor before with the list lenght.
    .process(new Processor(){
    
          @Override
          public void proocess(Exchange arg0){
              MyObj currentElement = (MyObj) arg0.getIn().getBody(List.class).get(LOOP_INDEX);
              // Do your processing here.
          }
    })
    .end()
    .end();
    

    LOOP_INDEX 属性将包含当前迭代,从0开始到LIST_LENGHT标头值,因此您可以使用它从列表中获取当前元素 .

    注意双 end() 方法调用:一个用于结束循环,另一个用于结束路由 .

    文档:http://camel.apache.org/loop.html

相关问题