首页 文章

如何获取kafka主题的分区的最新偏移量?

提问于
浏览
15

我正在为Kafka使用Python高级消费者,并希望了解主题的每个分区的最新偏移量 . 但是我无法让它发挥作用 .

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()

但我得到的输出是

For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None

我有一个使用 assign 的替代方法,但结果是一样的

con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]

con.assign(ps)
for p in ps:
    print "For partition %s highwater is %s"%(p.partition,con.highwater(p))

print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()

从某些文档中可以看出,如果尚未发布 fetch ,我可能会遇到此行为 . 但是我找不到强迫它的方法 . 我究竟做错了什么?

或者是否有不同/更简单的方法来获取主题的最新偏移量?

5 回答

  • 14

    最后花了一天时间和几次错误的开始,我找到了一个解决方案,让它工作 . 发布给她以便其他人可以参考它 .

    from kafka import SimpleClient
    from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
    from kafka.common import OffsetRequestPayload
    
    client = SimpleClient(brokers)
    
    partitions = client.topic_partitions[topic]
    offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
    
    offsets_responses = client.send_offset_request(offset_requests)
    
    for r in offsets_responses:
        print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
    
  • 8

    如果您希望使用kafka / bin中的Kafka shell脚本,则可以使用kafka-run-class.sh获取最新和最小的偏移量 .

    获取最新的偏移命令将如下所示

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname
    

    获得最小偏移量命令将如下所示

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname
    

    您可以在以下链接中找到有关Get Offsets Shell的更多信息link

    希望这可以帮助!

  • 1
    from kafka import KafkaConsumer, TopicPartition
    
    TOPIC = 'MYTOPIC'
    GROUP = 'MYGROUP'
    BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092']
    
    consumer = KafkaConsumer(
            bootstrap_servers=BOOTSTRAP_SERVERS,
            group_id=GROUP,
            enable_auto_commit=False
        )
    
    
    for p in consumer.partitions_for_topic(TOPIC):
        tp = TopicPartition(TOPIC, p)
        consumer.assign([tp])
        committed = consumer.committed(tp)
        consumer.seek_to_end(tp)
        last_offset = consumer.position(tp)
        print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed)))
    
    consumer.close(autocommit=False)
    
  • 0

    实现此目的的另一种方法是轮询消费者以获得最后消耗的偏移量,然后使用seek_to_end方法获得最新的可用偏移量分区 .

    from kafka import KafkaConsumer
    consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
    consumer.poll()
    consumer.seek_to_end()
    

    使用消费者群体时,此方法特别有用 .

    来源:

  • 29

    使用 kafka-python>=1.3.4 ,您可以使用:

    kafka.KafkaConsumer.end_offsets(partitions)

    获取给定分区的最后一个偏移量 . 分区的最后偏移是即将到来的消息的偏移,即最后可用消息1的偏移 .

    from kafka import TopicPartition
    from kafka.consumer import KafkaConsumer
    
    con = KafkaConsumer(bootstrap_servers = brokers)
    ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
    
    con.end_offsets(ps)
    

相关问题