问题

我对卡夫卡比较新。我已经做了一些实验,但有一些事情我不清楚消费者抵消。根据我的理解,当消费者开始时,它将开始读取的偏移量由配置设置auto.offset.reset确定(如果我错了,请纠正我)。

现在说,例如,主题中有10条消息(偏移0到9),并且消费者在它关闭之前(或者在我杀死消费者之前)恰好消耗了其中的5条消息。然后说我重启那个消费者流程。我的问题是:

如果auto.offset.reset设置为smallest,它是否总是从偏移量0开始消耗?

如果auto.offset.reset设置为largest,它是否会从偏移量5开始消耗?

关于这种情况的行为总是确定性的吗?如果我的问题中的任何内容不清楚,请不要犹豫。提前致谢。


#1 热门回答(163 赞)

它比你描述的要复杂一点。如果你的使用者组没有在某处提交有效的偏移量,那么auto.offset.resetconfig仅会启动(2个支持的偏移存储现在是Kafka和Zookeeper)。它还取决于你使用的消费者类型。

如果你使用高级java使用者,那么请想象以下场景:

  • 你在消费者群组1中有一个消费者已经消耗了5条消息并且已经死亡。下次启动此消费者时,它甚至不会使用该auto.offset.reset配置,并将从它死亡的地方继续,因为它只是从偏移存储(Kafka或ZK,如我所述)获取存储的偏移量。
  • 你在主题中有消息(如你​​所述),并在新的消费者组group2中启动消费者。在任何地方都没有存储偏移量,这次auto.offset.reset配置将决定是从主题的开头(最小)开始还是从主题的结尾开始(最大)

影响什么偏移值将对应于smallestlargestconfigs的另一件事是日志保留策略。想象一下,你的主题保留配置为1小时。你生成5条消息,然后一小时后再发布5条消息。 largestoffset仍将保持与前一示例相同,但2886741859one将无法成为0,因为Kafka已经删除了这些消息,因此最小的可用偏移量将为5

上面提到的所有内容都与SimpleConsumer无关,每次运行时,都会决定从哪里开始使用auto.offset.resetconfig。


#2 热门回答(41 赞)

只是一个更新:从Kafka 0.9开始,Kafka正在使用消费者的新Java版本,并且auto.offset.reset参数名称已更改;从手册:

如果Kafka中没有初始偏移量或服务器上当前偏移量不再存在(例如因为该数据已被删除)该怎么办:最早:自动将偏移量重置为最早的最近偏移量:自动重置偏移到最新的偏移量:如果没有为消费者的组找到任何其他偏移量,则向消费者抛出异常:向消费者抛出异常。

在检查接受的答案后我花了一些时间来找到这个,所以我认为社区发布它可能是有用的。


#3 热门回答(3 赞)

还有更多的offsets.retention.minutes。如果自上次提交以来的时间是> offsets.retention.minutes,那么auto.offset.reset也会开始


原文链接