首页 文章

Spring Integration:获取聚合中涉及的所有头文件,而不仅仅是最后一个

提问于
浏览
1

我有一个像这样定义的Spring集成流程:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .acknowledgeMode(MANUAL)
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

并且服务激活器定义如下:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(List<> Collection<MyEvent> events) {
        ....
    }    
}

我要做的是更改 myMethod 以获取与聚合中每条消息关联的特定标头的列表 . 在我的情况下,我想为每条消息保留 AmqpHeaders.CHANNELAmqpHeaders.DELIVERY_TAG ,这样我就可以为RabbitMQ的每条消息执行ACK或NACK(注意'm deliberately using manual acknowledgement mode in the IntegrationFlow since I want to send the ACK'执行后'm deliberately using manual acknowledgement mode in the IntegrationFlow since I want to send the ACK' s / NACK) .

我试过这种方法:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels, 
                         @Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags, 
                         Collection<MyEvent> events) {
        ....
    }    
}

但在这里我似乎只获得最后一条消息的 Headers 值(即 channelstags 总是大小为1,即使 events 集合中有多个事件) .

我也尝试将 Collection<MyEvent> 更改为 Collection<Message>org.springframework.messaging.Message )以尝试手动提取 Headers ,但这会失败:

org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message

因为消息已经被IntegrationFlow中定义的消息转换器转换 .

我怎样才能做到这一点?

1 回答

  • 1

    您需要在 aggregator 上自定义 outputProcessor 以返回所需的消息( Headers 中的 Channels /传递标记列表和有效负载中的事件列表) .

相关问题