首页 文章

对于某个主题的某些分区,kafka偏移和滞后是未知的

提问于
浏览
1

enter image description here

我正在使用https://github.com/confluentinc/confluent-kafka-go的消费者 . kafka版本是0.10.1.0 .

这是我的消费者的配置:

kafkaClient, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers":               broker,
    "group.id":                       "udwg20",
    "session.timeout.ms":              60000,
    "go.events.channel.enable":        true,
    "go.application.rebalance.enable": true,
    "default.topic.config": kafka.ConfigMap{
        "auto.offset.reset":      "earliest",
        "enable.auto.commit":      true,
        "auto.commit.interval.ms": 10000}})

开始时,显示了所有当前偏移和滞后,但在运行几个小时后,某些分区(未收到任何新消息)的偏移和滞后变得未知 . 如果有一条消息来到分区,其偏移和滞后未知,则偏移和滞后将再次可见,并且消息将被消耗 .

当存在一些具有未知电流偏移和滞后的分区时,我重启消费者,此时,所有当前具有未知电流偏移和滞后状态的分区将从开始时再次消耗,但其他分区似乎正常运行 .

我还使用了一个python使用者,它使用不同的使用者组ID来消息来自该主题的消息 . python使用者似乎运行良好,没有任何具有未知当前偏移和滞后的分区 .

2 回答

  • 0

    offsets.retention.minutes 用于清理非活动的消费者组 . 如果消费者组没有为 offsets.retention.minutes (默认为24h)提交任何偏移量,则kafka将清除其偏移量 . 这就是偏移和日志设置为 unknown 的原因 .

    您可以增加偏移保留期,但请注意,旧消费者将在 __consumer_offsets 主题中保留空间 .

  • 0

    我使用命令bellow来查看我的使用者组ID的偏移量是否定期提交 .

    echo exclude.internal.topics=false > consumer.properties
    
    kafka-console-consumer --consumer.config consumer.properties --from-beginning --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
    

    虽然我将 enable.auto.commit 的值设置为 true ,但它不会定期为其 lag = 0 的分区提交 . 即使消费者组仍处于活动状态,这些分区的当前偏移量也会在2到3小时后被删除 .

    为了解决这个问题,我将 enable.auto.commit 设置为 false 并编写我自己的函数以在每5秒后提交偏移量 .

    这是理想的:当消费者获取新的 Message 事件或到达分区结束( PartitionEOF )事件时,从事件的数据中,我将最新的当前偏移量保存在提交映射中(键: topic_partition 值: kafka.TopicPartition{ Topic, Partition, Offset } )并且是一个定期提交此映射的函数(可能在每5秒后) . 当消费者获得 RevokedPartitions 事件时,我从提交映射中删除相应的键 topic_partition .

相关问题