首页 文章

Spring Cloud Stream with RabbitMQ binder,如何应用@Transactional?

提问于
浏览
1

我有一个Spring Cloud Stream应用程序,它使用Rabbit BinderRabbitMQ接收事件 . 我的申请可以概括为:

@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
       // Transform events and store them in MongoDB using 
       // spring-boot-data-mongodb-reactive
       ...
}

问题是 @Transactional 似乎不适用于Spring Cloud Stream(或者至少's my impression) since if there'是一个例外,当写入MongoDB时,事件似乎已经被ack:ed到RabbitMQ并且操作没有重试 .

鉴于我希望实现与使用 @Transactional 围绕函数 @Transactional 时基本相同的功能:

  • 使用带有Rabbit Binder的Spring Cloud Stream时,是否必须手动向RabbitMQ确认消息?

  • 如果是这样,我该如何实现?

1 回答

  • 1

    这里有几个问题 .

    • 确认消息不需要事务

    • 基于Reactor的 @StreamListener 方法只调用一次,只是为了设置 Flux 所以 @Transactional 对该方法没有意义 - 消息然后流过通量,因此任何与各个消息有关的事情都必须在通量的上下文中完成 .

    • Spring Transactions绑定到线程 - Reactor是非阻塞的;消息将在第一次切换时被激活 .

    是的,你需要使用手动的ack;大概就是mongodb商店运营的结果 . 您可能需要使用 Flux<Message<Event>> ,这样您就可以访问 Channels 和投递标签 Headers .

相关问题