我有一个Spring Cloud Stream应用程序,它使用Rabbit Binder从RabbitMQ接收事件 . 我的申请可以概括为:
@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
// Transform events and store them in MongoDB using
// spring-boot-data-mongodb-reactive
...
}
问题是 @Transactional
似乎不适用于Spring Cloud Stream(或者至少's my impression) since if there'是一个例外,当写入MongoDB时,事件似乎已经被ack:ed到RabbitMQ并且操作没有重试 .
鉴于我希望实现与使用 @Transactional
围绕函数 @Transactional
时基本相同的功能:
-
使用带有Rabbit Binder的Spring Cloud Stream时,是否必须手动向RabbitMQ确认消息?
-
如果是这样,我该如何实现?
1 回答
这里有几个问题 .
确认消息不需要事务
基于Reactor的
@StreamListener
方法只调用一次,只是为了设置Flux
所以@Transactional
对该方法没有意义 - 消息然后流过通量,因此任何与各个消息有关的事情都必须在通量的上下文中完成 .Spring Transactions绑定到线程 - Reactor是非阻塞的;消息将在第一次切换时被激活 .
是的,你需要使用手动的ack;大概就是mongodb商店运营的结果 . 您可能需要使用
Flux<Message<Event>>
,这样您就可以访问 Channels 和投递标签 Headers .