首页 文章

重新启动Kafka python MultiProcessConsumer再次消耗队列中的所有消息

提问于
浏览
0

REF:Restarting a Kafka (python) consumer consumes all the messages in the queue again

我是kafka的新手,我也在尝试处理抵消管理 .

使用最新版本的Apache-Kafka(0.8.1.1 . )和pypi安装的kafka-python 0.9.2(2014-08-27上次上传),这与github上的当前主分支不同 .

使用“SimpleConsumer”进行测试时=>崩溃并重新启动脚本会消耗上次已知偏移量的消息 .

当使用“MultiProcessConsumer”进行测试=>崩溃并重新启动脚本时,将从偏移量“0”重新开始消耗

我的小脚本(MultiProcessConsumer):

from kafka import KafkaClient, MultiProcessConsumer
KFK = KafkaClient("localhost:9092")
consumer = MultiProcessConsumer(KFK, "my-group1", "my-topic", num_procs=2)

我可以通过以下方式检查抵消:

consumer.offsets
{0: 0, 1: 0}

然后,我运行:

A = consumer.get_messages(count=1235)
consumer.offsets
{0: 1235, 1: 0}

在崩溃并再次重新启动脚本之后,首先调用“consumer.offsets”会返回“{0:1235,1:0}”,这很好 . 但是跑步:

A.consumer.get_messages(count=388)
consumer.offsets
{0: 388, 1: 0}

有关如何处理这个问题的任何想法?此外,无论如何都要正确地改变MultiProcessConsumer偏移量从定义的位置开始?

谢谢你的帮助 .

Edit : 潜入kafka-python lib源并检查GitHub上的问题后,请参阅:https://github.com/mumrah/kafka-python/issues/173

所以问题是当master multiprocessconsumer启动子进程时,它会在主题的每个分区上将它们的偏移量设置为“0”(因为子进程的autocommit设置为false),而不是给它们正确的值 .

请参阅GitHub上的“mahall”评论 .

1 回答

  • 0

    这取决于消费者请求kafka经纪人抵消的方式 . 很可能你在Java中做了与此相同的事情

    readOffset = getLastOffset(consumer,topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
    

    尝试这样的事情

    readOffset = getLastOffset(consumer,topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
    

相关问题