我使用Kafka消息代理和Spring Kafka .
我使用默认的自动确认模型 . 有时我的 @KafkaListener 需要约5分钟才能完成工作 .
@KafkaListener
我注意到,当它发生时,Kafka会产生重复的消息 . 是否有任何超时属性可以配置,以防止Kafka重复相同的消息,让 生产环境 者等待(至少例如10分钟),而消费者将完成工作?
生产环境 者和消费者脱钩 - 没有办法阻止 生产环境 者发送更多的消息,因为消费者很慢 .
我没有明确使用auto.commit . 我使用默认的Spring配置
默认情况下 auto.commit 是 true (在Kafka客户端中),因此spring下的默认行为是客户端执行自己的提交而不是容器 . 将其设置为 false 并将容器属性 AckMode 设置为 RECORD ,以便在侦听器退出后执行提交 .
auto.commit
true
false
AckMode
RECORD
有一个kafka consumer property max.poll.interval.ms ,默认为5分钟(300000) .
max.poll.interval.ms
使用消费者群组管理时poll()调用之间的最大延迟 . 这为消费者在获取更多记录之前可以闲置的时间量设置了上限 . 如果在此超时到期之前未调用poll(),则认为使用者失败,并且该组将重新 balancer 以便将分区重新分配给另一个成员 .
重新 balancer 将导致重新发送 . 您可以增加该属性以避免重新 balancer .
正如我所说,日志通常有帮助;你应该看到日志中发生的重新 balancer .
1 回答
生产环境 者和消费者脱钩 - 没有办法阻止 生产环境 者发送更多的消息,因为消费者很慢 .
默认情况下
auto.commit
是true
(在Kafka客户端中),因此spring下的默认行为是客户端执行自己的提交而不是容器 . 将其设置为false
并将容器属性AckMode
设置为RECORD
,以便在侦听器退出后执行提交 .有一个kafka consumer property
max.poll.interval.ms
,默认为5分钟(300000) .重新 balancer 将导致重新发送 . 您可以增加该属性以避免重新 balancer .
正如我所说,日志通常有帮助;你应该看到日志中发生的重新 balancer .