我正在使用https://github.com/confluentinc/confluent-kafka-go的消费者 . kafka版本是0.10.1.0 .
这是我的消费者的配置:
kafkaClient, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": "udwg20",
"session.timeout.ms": 60000,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"default.topic.config": kafka.ConfigMap{
"auto.offset.reset": "earliest",
"enable.auto.commit": true,
"auto.commit.interval.ms": 10000}})
开始时,显示了所有当前偏移和滞后,但在运行几个小时后,某些分区(未收到任何新消息)的偏移和滞后变得未知 . 如果有一条消息来到分区,其偏移和滞后未知,则偏移和滞后将再次可见,并且消息将被消耗 .
当存在一些具有未知电流偏移和滞后的分区时,我重启消费者,此时,所有当前具有未知电流偏移和滞后状态的分区将从开始时再次消耗,但其他分区似乎正常运行 .
我还使用了一个python使用者,它使用不同的使用者组ID来消息来自该主题的消息 . python使用者似乎运行良好,没有任何具有未知当前偏移和滞后的分区 .
2 回答
offsets.retention.minutes
用于清理非活动的消费者组 . 如果消费者组没有为offsets.retention.minutes
(默认为24h)提交任何偏移量,则kafka将清除其偏移量 . 这就是偏移和日志设置为unknown
的原因 .您可以增加偏移保留期,但请注意,旧消费者将在
__consumer_offsets
主题中保留空间 .我使用命令bellow来查看我的使用者组ID的偏移量是否定期提交 .
虽然我将
enable.auto.commit
的值设置为true
,但它不会定期为其lag = 0
的分区提交 . 即使消费者组仍处于活动状态,这些分区的当前偏移量也会在2到3小时后被删除 .为了解决这个问题,我将
enable.auto.commit
设置为false
并编写我自己的函数以在每5秒后提交偏移量 .这是理想的:当消费者获取新的
Message
事件或到达分区结束(PartitionEOF
)事件时,从事件的数据中,我将最新的当前偏移量保存在提交映射中(键:topic_partition
值:kafka.TopicPartition{ Topic, Partition, Offset }
)并且是一个定期提交此映射的函数(可能在每5秒后) . 当消费者获得RevokedPartitions
事件时,我从提交映射中删除相应的键topic_partition
.