首页 文章

Spring Integration AMQP - 偶尔没有消息,需要超时吗?

提问于
浏览
1

我有一个入站RabbitMQ通道适配器,每天成功处理3000条消息,但偶尔我会在RabbitMQ管理控制台中看到未分组的消息计数为1 . 这似乎仍然是这样 .

我确实有一个重试建议链重试3次,然后通过死信路由密钥转移到DLQ,这对大多数例外都很好 .

在过去的几周内,unacked发生了两次,有一次我能够进行一次线程转储,看到int-http:outbound-gateway调用卡在等待http响应getStatusCode()

我在int-amqp:inbound-channel-adapter上有一个receive-timeout =“59000”,我希望它会在超过超时的任何地方超时?

我现在注意到int-http:outbound-gateway上有一个reply-timeout属性我应该设置吗?

任何想法赞赏?

<int-amqp:inbound-channel-adapter id="amqpInCdbEvents"  channel="eventsAMQPChannel" channel-transacted="true"  transaction-manager="transactionManager" 
            queue-names="internal.events.queue" connection-factory="connectionFactory" 
            receive-timeout="59000" concurrent-consumers="${eventsAMQPChannel.concurrent-consumers}" 
            advice-chain="retryChain" auto-startup="false" /> 

     <int:channel id="eventsAMQPChannel" />

     <!--  CHAIN of processing for Asynch Processing of Events from intermediate Queue   -->
    <int:chain id="routeEventChain" input-channel="eventsAMQPChannel">
            <int:json-to-object-transformer type="xx.xx.xx.json.Event" object-mapper="springJacksonObjectMapper"/>
            <int:header-enricher>
                  <int:header name="originalPayload" expression="payload" overwrite="true"/>
                  <int:header name="message_id"      expression="payload.id" overwrite="true"/>
            </int:header-enricher>  
            <int:router expression="payload.eventType">
                <int:mapping value="VALUE"              channel="valueEventChannel"/>
                <int:mapping value="SWAP"               channel="swapEventChannel"/>
            </int:router>
    </int:chain>

    <int:channel id="valueEventChannel" />
    <int:channel id="swapEventChannel" />

    <int:chain id="valueEventChain" input-channel="valueEventChannel" output-channel="nullChannel">
            <int:transformer ref="syncValuationTransformer" />
            <int:object-to-json-transformer object-mapper="springJacksonObjectMapper" />
            <int:header-enricher>
                     <int:header name="contentType" value="application/json;charset=UTF-8" overwrite="true"/>
            </int:header-enricher>                  
            <int-http:outbound-gateway id="httpOutboundGatewayValuationServiceFinalValuation"                                                           
                    expected-response-type="java.lang.String"
                    http-method="POST"      charset="UTF-8"
                    extract-request-payload="true"                                          
                    url="${value.service.uri}/value"/>
    </int:chain>

1 回答

  • 0

    reply-timeout 是将回复发送到回复通道时的超时(如果它可以阻止 - 例如已满的有界队列通道) .

    int-http:outbound-gateway调用卡在等待http响应getStatusCode()

    您可以在 ClientHttpRequestFactory 上设置客户端超时( readtimeout ),您可以将其配置到出站适配器中...

    /**
     * Create a new instance of the {@link RestTemplate} based on the given {@link ClientHttpRequestFactory}.
     * @param requestFactory HTTP request factory to use
     * @see org.springframework.http.client.SimpleClientHttpRequestFactory
     * @see org.springframework.http.client.HttpComponentsClientHttpRequestFactory
     */
    public RestTemplate(ClientHttpRequestFactory requestFactory) {
        this();
        setRequestFactory(requestFactory);
    }
    

相关问题