首页 文章

Kafka Consumer收到的消息很少(不是全部)(之前已经处理过)

提问于
浏览
1

我们将主题保留设置为7天(168小时) . 当 生产环境 者发送消息时,消息实时消耗 . 一切都按预期工作 . 但是最近在 生产环境 服务器上,作为OS补丁的一部分,Devops意外地将时区从PST改为EST .

在Kafka服务器重新启动之后,我们看到消费者消耗的旧消息很少(不是全部,而是随机) . 我们要求Devops将其更改回PST并重新启动 . 同样,旧消息也在本周末重新出现 .

我们在较低的环境(Dev,QA,Stage等)中没有看到这个问题 .

Kafka 版:kafka_2.12-0.11.0.2

任何帮助都非常感谢 .

添加更多信息...最近我们的CentOS有一个补丁更新,不知何故,管理员从PST时区更改为EST并启动Kafka服务器...之后我们的消费者开始看到来自偏移0的消息 . 调试后,我发现时区变化和管理员在4天后从EST更改回PST . 我们的消息制作者在定期更改时区之前和之后发送消息 . 在从EST返回到PST的时区后,Kafka服务器重新启动,我看到了轰鸣声警告 .

这个日志发生在我们从EST切换回PST时:( server.log)[2018-06-13 18:36:34,430] WARN由于需求失败而发现损坏的索引文件:找到损坏的索引,索引文件(/ app / kafka_2.12-0.11.0.2 / data / __ consumer_offsets-21 / 00000000000000002076.index)具有非零大小,但最后一个偏移量为2076,不大于基本偏移量2076.} . 删除/app/kafka_2.12-0.11.0.2/data/__consumer_offsets-21/00000000000000002076.timeindex,/app/kafka_2.12-0.11.0.2/data/__consumer_offsets-21/00000000000000002076.index,/app/kafka_2.12 -0.11.0.2 / data / __ consumer_offsets-21 / 00000000000000002076.txnindex和重建索引...(kafka.log.Log)

经过3天的时区更改从EST返回到PST后,我们重新启动了消费者,并开始再次看到偏移量为0的消费者消息 .

1 回答

  • 0

    我想这是因为你会在 Commit 新的偏移之前重启程序 .

    管理偏移对于每个使用者组,Kafka维护每个正在使用的分区的已提交偏移量 . 当使用者处理消息时,它不会将其从分区中删除 . 相反,它只是使用一个名为提交偏移量的过程来更新其当前偏移量 . 如果消费者在处理消息之后但在提交其偏移之前失败,则提交的偏移信息将不反映消息的处理 . 这意味着该组中的下一个使用者将再次处理该消息以分配该分区 . 自动提交偏移量提交偏移量的最简单方法是让Kafka消费者自动执行此操作 . 这很简单,但它比手动提交控制更少 . 默认情况下,消费者每5秒自动提交一次偏移 . 无论消费者在处理消息方面取得了哪些进展,此默认提交每5秒发生一次 . 此外,当消费者调用poll()时,这也会导致从前一次调用poll()返回的最新偏移量被提交(因为它可能已被处理) . 如果提交的偏移量超过了消息的处理并且存在消费者故障,则可能无法处理某些消息 . 这是因为处理在提交的偏移量处重新开始,该偏移量晚于失败之前要处理的最后一条消息 . 因此,如果可靠性比简单性更重要,通常最好手动提交偏移量 . 手动提交偏移量如果enable.auto.commit设置为false,则消费者手动提交其偏移量 . 它可以同步或异步执行此操作 . 常见的模式是基于周期性计时器提交最新处理的消息的偏移量 . 此模式意味着每条消息至少处理一次,但提交的偏移量永远不会超过正在处理的消息的进度 . 周期性计时器的频率控制消费者故障后可以重新处理的消息数 . 当应用程序重新启动或组重新 balancer 时,将从上次保存的已提交偏移量中再次检索消息 . 提交的偏移量是恢复处理的消息的偏移量 . 这通常是最近处理的消息的偏移加一 .

    this article,我认为这是非常有帮助的 .

相关问题