问题
我对卡夫卡比较新。我已经做了一些实验,但有一些事情我不清楚消费者抵消。根据我的理解,当消费者开始时,它将开始读取的偏移量由配置设置auto.offset.reset
确定(如果我错了,请纠正我)。
现在说,例如,主题中有10条消息(偏移0到9),并且消费者在它关闭之前(或者在我杀死消费者之前)恰好消耗了其中的5条消息。然后说我重启那个消费者流程。我的问题是:
如果auto.offset.reset
设置为smallest
,它是否总是从偏移量0开始消耗?
如果auto.offset.reset
设置为largest
,它是否会从偏移量5开始消耗?
关于这种情况的行为总是确定性的吗?如果我的问题中的任何内容不清楚,请不要犹豫。提前致谢。
#1 热门回答(163 赞)
它比你描述的要复杂一点。如果你的使用者组没有在某处提交有效的偏移量,那么auto.offset.reset
config仅会启动(2个支持的偏移存储现在是Kafka和Zookeeper)。它还取决于你使用的消费者类型。
如果你使用高级java使用者,那么请想象以下场景:
- 你在消费者群组1中有一个消费者已经消耗了5条消息并且已经死亡。下次启动此消费者时,它甚至不会使用该auto.offset.reset配置,并将从它死亡的地方继续,因为它只是从偏移存储(Kafka或ZK,如我所述)获取存储的偏移量。
- 你在主题中有消息(如你所述),并在新的消费者组group2中启动消费者。在任何地方都没有存储偏移量,这次auto.offset.reset配置将决定是从主题的开头(最小)开始还是从主题的结尾开始(最大)
影响什么偏移值将对应于smallest
和largest
configs的另一件事是日志保留策略。想象一下,你的主题保留配置为1小时。你生成5条消息,然后一小时后再发布5条消息。 largest
offset仍将保持与前一示例相同,但2886741859one将无法成为0
,因为Kafka已经删除了这些消息,因此最小的可用偏移量将为5
。
上面提到的所有内容都与SimpleConsumer
无关,每次运行时,都会决定从哪里开始使用auto.offset.reset
config。
#2 热门回答(41 赞)
只是一个更新:从Kafka 0.9开始,Kafka正在使用消费者的新Java版本,并且auto.offset.reset参数名称已更改;从手册:
如果Kafka中没有初始偏移量或服务器上当前偏移量不再存在(例如因为该数据已被删除)该怎么办:最早:自动将偏移量重置为最早的最近偏移量:自动重置偏移到最新的偏移量:如果没有为消费者的组找到任何其他偏移量,则向消费者抛出异常:向消费者抛出异常。
在检查接受的答案后我花了一些时间来找到这个,所以我认为社区发布它可能是有用的。
#3 热门回答(3 赞)
还有更多的offsets.retention.minutes。如果自上次提交以来的时间是> offsets.retention.minutes
,那么auto.offset.reset
也会开始