Code:
from kafka import KafkaConsumer
task_event = TaskEvent()
consumer = KafkaConsumer('test',bootstrap_servers=["localhost:9092"],group_id=None, auto_offset_reset='smallest')
for msg in consumer:
print msg.offset
Output:
0
1
2
.
.
16
我在主题 test
中共有16条消息 .
What determines Kafka consumer offset? - 根据接受的答案, You have a consumer in a consumer group group1 that has consumed 5 messages and died. Next time you start this consumer it won't even use that auto.offset.reset config and will continue from the place it died because it will just fetch the stored offset from the offset storage
根据Python API文档 - http://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html, enable_auto_commit
默认为 True
,这意味着消费者的偏移应该在后台提交 . 但是当我停止运行多次以上时,我得到相同的输出,如果 auto_commit
默认是 True
(假设任何API的规则都相同,可能是Java或Python),这是预期的 .
谢谢 .
1 回答
根据documentation,
我上面的代码,组ID是
None
. 我更改了它并给出了一个组名,并且正在提交抵消 .