REF:Restarting a Kafka (python) consumer consumes all the messages in the queue again
我是kafka的新手,我也在尝试处理抵消管理 .
使用最新版本的Apache-Kafka(0.8.1.1 . )和pypi安装的kafka-python 0.9.2(2014-08-27上次上传),这与github上的当前主分支不同 .
使用“SimpleConsumer”进行测试时=>崩溃并重新启动脚本会消耗上次已知偏移量的消息 .
当使用“MultiProcessConsumer”进行测试=>崩溃并重新启动脚本时,将从偏移量“0”重新开始消耗
我的小脚本(MultiProcessConsumer):
from kafka import KafkaClient, MultiProcessConsumer
KFK = KafkaClient("localhost:9092")
consumer = MultiProcessConsumer(KFK, "my-group1", "my-topic", num_procs=2)
我可以通过以下方式检查抵消:
consumer.offsets
{0: 0, 1: 0}
然后,我运行:
A = consumer.get_messages(count=1235)
consumer.offsets
{0: 1235, 1: 0}
在崩溃并再次重新启动脚本之后,首先调用“consumer.offsets”会返回“{0:1235,1:0}”,这很好 . 但是跑步:
A.consumer.get_messages(count=388)
consumer.offsets
{0: 388, 1: 0}
有关如何处理这个问题的任何想法?此外,无论如何都要正确地改变MultiProcessConsumer偏移量从定义的位置开始?
谢谢你的帮助 .
Edit : 潜入kafka-python lib源并检查GitHub上的问题后,请参阅:https://github.com/mumrah/kafka-python/issues/173
所以问题是当master multiprocessconsumer启动子进程时,它会在主题的每个分区上将它们的偏移量设置为“0”(因为子进程的autocommit设置为false),而不是给它们正确的值 .
请参阅GitHub上的“mahall”评论 .
1 回答
这取决于消费者请求kafka经纪人抵消的方式 . 很可能你在Java中做了与此相同的事情
尝试这样的事情