我想将kafka事务与存储库事务同步:
@Transactional
public void syncTransaction(){
myRepository.save(someObject)
kafkaTemplate.send(someEvent)
}
由于合并(https://github.com/spring-projects/spring-kafka/issues/373)并且根据doc这是可能的 . 然而,我有理解和实现该功能的问题 . 查看https://docs.spring.io/spring-kafka/reference/htmlsingle/#_transaction_synchronization中的示例我必须创建一个MessageListenerContainer来监听我自己的事件 . 我还需要使用KafkaTemplate发送我的活动吗? MessageListenerContainer是否禁止向代理发送?
如果我理解正确kafkaTemplate和kafkaTransactionManager必须使用相同的producerFactory,我必须在其中启用Transaction设置transactionIdPrefix . 在我的示例中,我必须将messageListenerContainer的TransactionManager设置为DataSourceTransactionManager . 那是对的吗?
从我的角度来看,我通过kafkaTemplate发送一个事件看起来很奇怪,听我自己的事件并再次使用kafkaTemplate转发事件 .
如果我能得到一个kafka事务与存储库事务和解释的简单同步的示例,我真的会帮助我 .
2 回答
如果监听器容器配置了
KafkaTransactionManager
,容器将创建一个 生产环境 者,该 生产环境 者将被任何下游kafka模板使用,容器将为您发送偏移量 .如果容器具有其他事务管理器,则容器无法发送偏移量,因为它无权访问 生产环境 者(或模板) .
另一个解决方案是使用
@Transactional
(使用数据源TM)注释您的方法,并使用kafka TM配置容器 .这样,你的DB tx将在线程返回容器之前提交,然后容器将偏移量发送到kafka事务并提交它 .
有关示例,请参阅the framework test cases .
@Eike Behrends有一个db kafka事务,你可以使用
ChainedTransactionManager
并以这种方式定义它:您需要注释事务性数据库kafka方法
@Transactional("chainedTransactionManager")
(你可以看看spring-kafka项目的问题:https://github.com/spring-projects/spring-kafka/issues/433)
你说 :
你试过这个吗?如果可以,请提供一个例子吗?