我正在使用Kafka流1.0与kafka broker 1.0.1的无状态处理器
问题是,CustomProcessor每隔几秒就关闭一次,这导致重新 balancer 信号,我正在使用以下配置:
session.timeout.ms=15000
heartbeat.interval.ms=3000
//将其设置为1/3 session.timeout
max.poll.interval.ms=Integer.MAX_VALUE
//使它变得那么大,因为我正在进行密集的计算操作,可能需要多达10分钟处理1 kafka消息(NLP操作)
max.poll.records=1
尽管有这种配置和我对kafka超时配置如何工作的理解,我看到消费者每隔几秒重新 balancer 一次 .
我已经阅读了下面的文章和其他stackoverflow问题 . 关于如何调整长时间操作并避免非常长的会话超时会使故障检测这么晚,但是我仍然会看到意外的行为,除非我误解了一些事情 .
Diff between session.timeout.ms and max.poll.interval
Kafka kstreams processing timeout
对于消费者环境设置,我有8台机器,每个16个代码,并且从1个主题消耗100个分区,我遵循这个融合的实践doc here推荐 .
有什么指针吗?
1 回答
我想到了 . 经过大量调试并为kafka流客户端和代理启用详细日志记录后,结果发现有两件事:
流1.0.0(HERE)中存在严重错误,因此我将客户端版本从1.0.0升级到1.0.1
我将消费者属性
default.deserialization.exception.handler
的值从org.apache.kafka.streams.errors.LogAndFailExceptionHandler
更新为org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
.经过上述2次更改后,一切都非常完美,没有重启,我使用grafana来监控重启,在过去48小时内,没有一次重启 .
我可能会做更多的故障排除,以确保上面的两个项目中的哪一个能够真正解决,但是我急于部署到 生产环境 中,所以如果有人有兴趣从那里开始,请继续,否则,一旦我有时间,做进一步分析并更新答案!
很高兴能解决这个问题!