我有一个像这样定义的Spring集成流程:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.acknowledgeMode(MANUAL)
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
并且服务激活器定义如下:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(List<> Collection<MyEvent> events) {
....
}
}
我要做的是更改 myMethod
以获取与聚合中每条消息关联的特定标头的列表 . 在我的情况下,我想为每条消息保留 AmqpHeaders.CHANNEL
和 AmqpHeaders.DELIVERY_TAG
,这样我就可以为RabbitMQ的每条消息执行ACK或NACK(注意'm deliberately using manual acknowledgement mode in the IntegrationFlow since I want to send the ACK'执行后'm deliberately using manual acknowledgement mode in the IntegrationFlow since I want to send the ACK' s / NACK) .
我试过这种方法:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(@Header(AmqpHeaders.CHANNEL) List<Channel> channels,
@Header(AmqpHeaders.DELIVERY_TAG) List<Long> tags,
Collection<MyEvent> events) {
....
}
}
但在这里我似乎只获得最后一条消息的 Headers 值(即 channels
和 tags
总是大小为1,即使 events
集合中有多个事件) .
我也尝试将 Collection<MyEvent>
更改为 Collection<Message>
( org.springframework.messaging.Message
)以尝试手动提取 Headers ,但这会失败:
org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: com.x.y.MyEvent cannot be cast to org.springframework.messaging.Message
因为消息已经被IntegrationFlow中定义的消息转换器转换 .
我怎样才能做到这一点?
1 回答
您需要在
aggregator
上自定义outputProcessor
以返回所需的消息( Headers 中的 Channels /传递标记列表和有效负载中的事件列表) .