首页 文章

使用python库检索kafka中的使用者组偏移量

提问于
浏览
1

我有python脚本,我需要使用kafka1代理群集检索从kafka主题读取的一组使用者的当前使用者组偏移量 . 这些是本地kafka使用者,它们将偏移量存储在kafka集群中,而不是存储在zookeeper中 .

脚本本身不需要消费任何消息,只需读取其他消费者的当前偏移量 . 我意识到可以用 kafka-consumer-groups.sh 做到这一点,但理想情况下我想避免依赖shell命令 .

我已经可以使用 dpkp/kafka-python 客户端,但只能通过创建使用者并将其分配给组,然后通过取消分配某些分区来影响使用该组的现有使用者 . 我需要脚本完全被动,不执行任何会中断其他消费者的操作 .

2 回答

  • 1

    linkedin/kafka-tools 具有用于获取组偏移的函数 get_offsets_for_group() . 可以传递组名称和主题名称,或仅传递组名称以检索该组的所有主题的已提交偏移量 .

    from kafka.tools.client import Client
    
    group='mygroup'
    
    client=Client(broker_list='localhost:9029')
    client.connect()
    
    offsets=client.get_offsets_for_group(group)
    
    for topic in offsets:
      for partition_offset in offsets[topic].partitions:
        print("group: {0} - topic: {1} - partition: {2}".format(group,topic,partition_offset))
    
  • 3

    使用 dpkp/kafka-python ,您可以通过发送 OffsetFetchRequest 来检索特定组的已提交偏移量 . 如果使用 OffsetFetchRequest_v3 ,则可以为主题参数传递 None ,以获取该组已存储偏移量的所有主题/分区的偏移量 .

    例如:

    from kafka import BrokerConnection
    from kafka.protocol.commit import *
    import socket
    
    group = 'mygroup'
    
    bc = BrokerConnection('localhost', 9092, socket.AF_INET)
    bc.connect_blocking()
    
    fetch_offset_request = OffsetFetchRequest_v3(group, None)
    
    future = bc.send(fetch_offset_request)
    while not future.is_done:
        for resp, f in bc.recv():
            f.success(resp)
    
    for topic in future.value.topics:
        print('offsets for {0}'.format(topic[0]))
        for partition in topic[1]:
            print('- partition {0}, offset: {1}'.format(partition[0], partition[1]))
    

    如果 mygroup 已为 topictopic2 提交了偏移量,它将打印如下内容:

    offsets for topic2
    - partition 0, offset: 10
    - partition 1, offset: 10
    - partition 2, offset: 10
    offsets for topic
    - partition 0, offset: 3
    

相关问题