我需要发布一个JSON Camel Servlet,然后复制该消息并拆分列表 . 处理每个拆分的消息,最后聚合并返回servlet响应 .
但是,只要在路由中引入拆分器,我就会看到输入流正在关闭,并且Servlet无法写回响应 .
请注意,也启用了流缓存 .
from("servlet:sample")
.log(LoggingLevel.INFO, "Message Received: JSON RQ")
.to("direct:samplejson");
from("direct:samplejson")
.streamCaching()
.setHeader("sampleId", simple("${id}"))
.log(LoggingLevel.INFO, SampleID in Header: ${header.sampleId}")
.unmarshal().json(JsonLibrary.Jackson, Sample.class)
.log(LoggingLevel.INFO, "Converted to JSON: ${body.toString}")
.process("sampleProcessor")
.split().body().streaming()
.choice()
.when().method("sampleProcessor", "isTypeA")
.marshal().jacksonxml(Sample.class)
.endChoice()
.otherwise()
.marshal().jacksonxml(Sample.class)
.end()
.aggregate(SampleAggregationStrategy).header("sampleId").completionSize(2).completionTimeout(1000L)
.to("direct:samplexml");
from("direct:samplexml").marshal().jacksonxml(List.class).log("FINISHED PROCESSING");
例外:
2017-06-12 12:22:25,311 [apr-8080-exec-2] INFO route1 - FINISHED PROCESSING
2017-06-12 12:22:25,321 [apr-8080-exec-2] ERROR CamelHttpTransportServlet - Error processing request
java.io.IOException: Stream closed
at org.apache.catalina.connector.InputBuffer.read(InputBuffer.java:372) ~[catalina.jar:8.0.30]
at org.apache.catalina.connector.CoyoteInputStream.read(CoyoteInputStream.java:156) ~[catalina.jar:8.0.30]
at org.apache.camel.util.IOHelper.copy(IOHelper.java:196) ~[camel-core-2.19.0.jar:2.19.0]
at org.apache.camel.http.common.DefaultHttpBinding.copyStream(DefaultHttpBinding.java:432) ~[camel-http-common-2.19.0.jar:2.19.0]
at org.apache.camel.http.common.DefaultHttpBinding.doWriteDirectResponse(DefaultHttpBinding.java:496) ~[camel-http-common-2.19.0.jar:2.19.0]
at org.apache.camel.http.common.DefaultHttpBinding.doWriteResponse(DefaultHttpBinding.java:395) ~[camel-http-common-2.19.0.jar:2.19.0]
at org.apache.camel.http.common.DefaultHttpBinding.writeResponse(DefaultHttpBinding.java:322) ~[camel-http-common-2.19.0.jar:2.19.0]
at org.apache.camel.http.common.CamelServlet.doService(CamelServlet.java:210) [camel-http-common-2.19.0.jar:2.19.0]
at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:74) [camel-http-common-2.19.0.jar:2.19.0]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) [servlet-api.jar:?]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:291) [catalina.jar:8.0.30]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) [catalina.jar:8.0.30]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) [tomcat-websocket.jar:8.0.30]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) [catalina.jar:8.0.30]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) [catalina.jar:8.0.30]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212) [catalina.jar:8.0.30]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) [catalina.jar:8.0.30]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) [catalina.jar:8.0.30]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141) [catalina.jar:8.0.30]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [catalina.jar:8.0.30]
at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616) [catalina.jar:8.0.30]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) [catalina.jar:8.0.30]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:521) [catalina.jar:8.0.30]
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1096) [tomcat-coyote.jar:8.0.30]
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:674) [tomcat-coyote.jar:8.0.30]
at org.apache.tomcat.util.net.AprEndpoint$SocketProcessor.doRun(AprEndpoint.java:2500) [tomcat-coyote.jar:8.0.30]
at org.apache.tomcat.util.net.AprEndpoint$SocketProcessor.run(AprEndpoint.java:2489) [tomcat-coyote.jar:8.0.30]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-util.jar:8.0.30]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74]
如果我删除拆分器和聚合器,我会看到Servlet响应返回 .
非常感谢您提供的所有帮助!
1 回答
请参阅堆肥消息处理器EIP模式(http://camel.apache.org/composed-message-processor.html)以及仅使用拆分器的示例,该示例允许您在同一工作单元中执行fork / join,并且能够在servlet中返回响应 . 拆分器仅支持
AggregationStrategy
,您可以配置聚合响应的位置 . 换句话说,不要同时使用拆分器和聚合器,而只使用拆分器 .