我是骆驼的新手 . 我尝试实现以下行为 . 我有填充JMSReplyTo标头的消息 . 抛出任何异常时,我会捕获,格式化并记录它 . 然后我想将它发送到错误队列并通过JMSReplyTo发送回客户端 . 这是我的路线配置 .
onException(Exception.class)
.transform().simple("Message:\n${body}\nHeaders:\n${headers}\nException:\n${exception.stacktrace}")
.log(LoggingLevel.ERROR, "errors","${body}")
.to("activemq:queue:GENERAL.ERRORS")
.end();
from("activemq:queue:changeProfitCenter.input")
.choice()
.when(header("JMSType").isEqualTo("xml"))
.to("direct:xmlChangeProfitCenter")
.when(header("JMSType").isEqualTo("json"))
.to("direct:jsonChangeProfitCenter")
.otherwise()
.transform(simple("Incorrect message type JMSType = '${header.JMSType}'"))
.end()
.end();
from("direct:jsonChangeProfitCenter")
.unmarshal().json(JsonLibrary.Jackson, Request.class)
.log(LoggingLevel.INFO, "unmarshal json : ${body}")
.bean(testService, "changeProfitCenter")
.log(LoggingLevel.INFO, "service response : ${body}")
.end();
from("direct:xmlChangeProfitCenter")
.unmarshal().jaxb("model")
.log(LoggingLevel.INFO, "unmarshal xml : ${body}")
.bean(testService, "changeProfitCenter")
.log(LoggingLevel.INFO, "service response : ${body}")
.end();
}
当我发送错误的消息并捕获ParseException时,我看到这样的行为 . 消息转换,记录,发送到GENERAL.ERRORS,20秒后消息从队列中消失并移动到DLQ,我可以在我的应用程序中看到此堆栈跟踪 .
12:10:51.637 [Camel (camel-1) thread #6 - JmsReplyManagerOnTimeout[GENERAL.ERRORS]] WARN o.a.c.c.j.r.TemporaryQueueReplyManager - Timeout occurred after 20000 millis waiting for reply message with correlationID [Camel-ID-1521796109842-0-4] on destination temp-queue://ID:1521796109515-1:1:1. Setting ExchangeTimedOutException on (MessageId: ID:38611-1521795301468-8:2:1:1:2 on ExchangeId: ID-1521796109842-0-3) and continue routing.
12:10:51.683 [Camel (camel-1) thread #6 - JmsReplyManagerOnTimeout[GENERAL.ERRORS]] ERROR o.a.c.p.FatalFallbackErrorHandler - Exception occurred while trying to handle previously thrown exception on exchangeId: ID-1521796109842-0-3 using: [Pipeline[[Channel[Transform(Simple: Message:
${body}
Headers:
${headers}
Exception:
${exception.stacktrace})], Channel[Log(errors)[body]], Channel[sendTo(activemq://queue:GENERAL.ERRORS)]]]]. The previous and the new exception will be logged in the following.
12:10:51.683 [Camel (camel-1) thread #6 - JmsReplyManagerOnTimeout[GENERAL.ERRORS]] ERROR o.a.c.p.FatalFallbackErrorHandler - \--> Previous exception on exchangeId: ID-1521796109842-0-3
java.io.IOException: javax.xml.bind.UnmarshalException
- with linked exception:
12:10:51.698 [Camel (camel-1) thread #6 - JmsReplyManagerOnTimeout[GENERAL.ERRORS]] ERROR o.a.c.p.FatalFallbackErrorHandler - \--> New exception on exchangeId: ID-1521796109842-0-3
org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID--1521796109842-0-4 not received on destination: temp-queue://ID:54045-1521796109515-1:1:1. Exchange[ID-1521796109842-0-3]
at org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:170)
at org.apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:62)
at org.apache.camel.component.jms.reply.CorrelationTimeoutMap$1.run(CorrelationTimeoutMap.java:60)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.base/java.lang.Thread.run(Thread.java:844)
12:10:51.698 [Camel (camel-1) thread #1 - JmsConsumer[changeProfitCenter.input]] WARN o.a.c.c.jms.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID-1521796109842-0-4 not received on destination: temp-queue://ID:54045-1521796109515-1:1:1. Exchange[ID-1521796109842-0-3]]
org.apache.camel.RuntimeCamelException: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID-1521796109842-0-4 not received on destination: temp-queue://ID:54045-1521796109515-1:1:1. Exchange[ID-1521796109842-0-3]
at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1831)
at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:195)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:116)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID-1521796109842-0-4 not received on destination: temp-queue://ID:54045-1521796109515-1:1:1. Exchange[ID-1521796109842-0-3]
我有一些问题 . 1)为什么消息没有处理到客户端 . 如果我删除 .to("activemq:queue:GENERAL.ERRORS")
,它将传递给客户端 . 但我想将它发送到客户端和错误队列 . 2)当JMSExpiration = 0时,为什么我有ExchangeTimedOutException .
有人可以帮我提供发送异常来回复和错误队列吗?我的最终目标是使用一种格式在错误队列中发送消息,并使用不同格式回复队列 . 我知道多播,但它提供了向多个队列发送相同的消息 . 那不是我想要的
1 回答
您可以创建一个errorHandler并将其发送到您的队列,一旦我使用RabbitMQ,它是这样的:
如果它对你有所帮助,这是我的项目有一个队列 .
https://github.com/vtripeno/integration_camel_apis