我正在为Kafka使用Python高级消费者,并希望了解主题的每个分区的最新偏移量 . 但是我无法让它发挥作用 .
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
但我得到的输出是
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
我有一个使用 assign
的替代方法,但结果是一样的
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
从某些文档中可以看出,如果尚未发布 fetch
,我可能会遇到此行为 . 但是我找不到强迫它的方法 . 我究竟做错了什么?
或者是否有不同/更简单的方法来获取主题的最新偏移量?
5 回答
最后花了一天时间和几次错误的开始,我找到了一个解决方案,让它工作 . 发布给她以便其他人可以参考它 .
如果您希望使用kafka / bin中的Kafka shell脚本,则可以使用kafka-run-class.sh获取最新和最小的偏移量 .
获取最新的偏移命令将如下所示
获得最小偏移量命令将如下所示
您可以在以下链接中找到有关Get Offsets Shell的更多信息link
希望这可以帮助!
实现此目的的另一种方法是轮询消费者以获得最后消耗的偏移量,然后使用seek_to_end方法获得最新的可用偏移量分区 .
使用消费者群体时,此方法特别有用 .
来源:
https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.poll
https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.seek_to_end
使用
kafka-python>=1.3.4
,您可以使用:kafka.KafkaConsumer.end_offsets(partitions)