首页 文章

Spring与AMQP集成:在错误通道上获取原始消息的自定义标头

提问于
浏览
0

在我的项目中使用Spring Integration与RabbitMQ我遇到了一个问题 . 该项目包括从队列接收消息,跟踪传入消息,使用服务激活器处理消息,以及跟踪响应或服务激活器抛出的异常 . 以下是示例配置:

<!-- inbound-gateway -->
<int-amqp:inbound-gateway id="inboundGateway"
         request-channel="gatewayRequestChannel"
         queue-names="myQueue"
         connection-factory="rabbitMQConnectionFactory"
         reply-channel="gatewayResponseChannel"
         error-channel="gatewayErrorChannel"
         error-handler="rabbitMQErrorHandler"
         mapped-request-headers="traceId"
         mapped-reply-headers="traceId" />

<!-- section to dispatch incoming messages to trace and execute service-activator -->
<int:publish-subscribe-channel id="gatewayRequestChannel" />
<int:bridge input-channel="gatewayRequestChannel" output-channel="traceChannel"/>
<int:bridge input-channel="gatewayRequestChannel" output-channel="serviceActivatorInputChannel"/>

<!-- the trace channel-->
<int:logging-channel-adapter id="traceChannel" 
    expression="headers['traceId'] + '= [Headers=' + headers + ', Payload=' + payload+']'" logger-name="my.logger" level="DEBUG" />

<!-- service activator which may throw an exception -->
<int:service-activator ref="myBean" method="myMethod" input-channel="serviceActivatorInputChannel" output-channel="serviceActivatorOutputChannel"/>

<!-- section to dispatch output-messages from service-activator to trace them and return them to the gateway -->
<int:publish-subscribe-channel id="serviceActivatorOutputChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
    output-channel="traceChannel" />
<int:bridge input-channel="serviceActivatorOutputChannel"
    output-channel="gatewayResponseChannel" />

<!-- section to dispatch exceptions from service-activator to trace them and return them to the gateway -->
<int:bridge input-channel="gatewayErrorChannel"
    output-channel="traceChannel" />
<int:bridge input-channel="gatewayErrorChannel"
    output-channel="gatewayResponseChannel" />

我简化了代码以适合我的解释 . 我们的想法是跟踪进出服务激活器的输入和输出/错误消息 . 为此,我使用名为traceId的消息头 . 此标识符用作相关标识符,以便能够将请求消息与其响应相关联(这两个消息共享相同的traceId值) .

当服务激活器没有抛出异常时,一切正常 . 但是当抛出异常时,网关似乎会生成一条新消息,而没有我原来的traceId标头 .

看一下网关代码,我在类org.springframework.integration.gateway.MessagingGatewaySupport中找到以下代码:

private Object doSendAndReceive(Object object, boolean shouldConvert) {
...
    if (error != null) {
                if (this.errorChannel != null) {
                    Message<?> errorMessage = new ErrorMessage(error);
                    Message<?> errorFlowReply = null;
                    try {
                        errorFlowReply = this.messagingTemplate.sendAndReceive(this.errorChannel, errorMessage);
                    }
...
}

看来,当发生异常时,会创建一条新消息,并将异常消息作为有效负载发送到网关的errorChannel . 这是我松散自定义 Headers 的地方 .

有没有办法在发生异常时保留我的自定义标头? (也许有一种方法来配置它,我可能会错过它...) . 或许我没有以正确的方式实施我的流程 . 如果是这种情况,欢迎提出任何意见或建议 .

顺便说一下,我正在使用spring-integration-core artifact的4.0.3.RELEASE版本 .

谢谢你的回答

编辑:正如Gary Russel所说,这个例子缺少以下puslish / subscribe队列配置

<int:publish-subscribe-channel id="gatewayErrorChannel"/>

1 回答

  • 2

    error-channel 上的消息是 ErrorMessage . 它有两个属性: cause - 原始异常和 failedMessage - 失败时的消息 . ErrorMessage 没有得到 failedMessage 的 Headers .

    你不能只是在没有额外工作的情况下将 ErrorMessage 发送回网关 .

    通常,错误流将在返回响应之前执行一些错误分析 .

    如果要还原某些自定义标头,则需要在错误流上添加标头 .

    就像是

    <int:header-enricher ...>
        <int:header name="traceId" expression="payload.failedMessage.headers.traceId" />
    </int:header-enricher>
    

    此外,您的配置有点奇怪,因为您在 gatewayErrorChannel 上有2个订阅者 . 除非它是 <publish-subscribe-channel/> ,否则这些消费者将得到替代消息;看起来你希望它们都得到它所以你需要正确地声明通道 .

相关问题