首页 文章

Spring Kafka中的事务同步

提问于
浏览
3

我想将kafka事务与存储库事务同步:

@Transactional
public void syncTransaction(){
  myRepository.save(someObject)
  kafkaTemplate.send(someEvent)
}

由于合并(https://github.com/spring-projects/spring-kafka/issues/373)并且根据doc这是可能的 . 然而,我有理解和实现该功能的问题 . 查看https://docs.spring.io/spring-kafka/reference/htmlsingle/#_transaction_synchronization中的示例我必须创建一个MessageListenerContainer来监听我自己的事件 . 我还需要使用KafkaTemplate发送我的活动吗? MessageListenerContainer是否禁止向代理发送?

如果我理解正确kafkaTemplate和kafkaTransactionManager必须使用相同的producerFactory,我必须在其中启用Transaction设置transactionIdPrefix . 在我的示例中,我必须将messageListenerContainer的TransactionManager设置为DataSourceTransactionManager . 那是对的吗?

从我的角度来看,我通过kafkaTemplate发送一个事件看起来很奇怪,听我自己的事件并再次使用kafkaTemplate转发事件 .

如果我能得到一个kafka事务与存储库事务和解释的简单同步的示例,我真的会帮助我 .

2 回答

  • 3

    如果监听器容器配置了 KafkaTransactionManager ,容器将创建一个 生产环境 者,该 生产环境 者将被任何下游kafka模板使用,容器将为您发送偏移量 .

    如果容器具有其他事务管理器,则容器无法发送偏移量,因为它无权访问 生产环境 者(或模板) .

    另一个解决方案是使用 @Transactional (使用数据源TM)注释您的方法,并使用kafka TM配置容器 .

    这样,你的DB tx将在线程返回容器之前提交,然后容器将偏移量发送到kafka事务并提交它 .

    有关示例,请参阅the framework test cases .

  • 1

    @Eike Behrends有一个db kafka事务,你可以使用 ChainedTransactionManager 并以这种方式定义它:

    @Bean
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());;
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }
    
    
    @Bean
    @Primary
    public JpaTransactionManager transactionManager(EntityManagerFactory em) {
        return new JpaTransactionManager(em);
    }
    
    @Bean(name = "chainedTransactionManager")
    public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
                                                               KafkaTransactionManager kafkaTransactionManager) {
        return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
    }
    

    您需要注释事务性数据库kafka方法 @Transactional("chainedTransactionManager")

    (你可以看看spring-kafka项目的问题:https://github.com/spring-projects/spring-kafka/issues/433

    你说 :

    从我的角度来看,通过kafkaTemplate发送事件看起来很奇怪,听我自己的事件并再次使用kafkaTemplate转发事件 .

    你试过这个吗?如果可以,请提供一个例子吗?

相关问题