我有python脚本,我需要使用kafka1代理群集检索从kafka主题读取的一组使用者的当前使用者组偏移量 . 这些是本地kafka使用者,它们将偏移量存储在kafka集群中,而不是存储在zookeeper中 .
脚本本身不需要消费任何消息,只需读取其他消费者的当前偏移量 . 我意识到可以用 kafka-consumer-groups.sh
做到这一点,但理想情况下我想避免依赖shell命令 .
我已经可以使用 dpkp/kafka-python
客户端,但只能通过创建使用者并将其分配给组,然后通过取消分配某些分区来影响使用该组的现有使用者 . 我需要脚本完全被动,不执行任何会中断其他消费者的操作 .
2 回答
linkedin/kafka-tools
具有用于获取组偏移的函数get_offsets_for_group()
. 可以传递组名称和主题名称,或仅传递组名称以检索该组的所有主题的已提交偏移量 .使用
dpkp/kafka-python
,您可以通过发送OffsetFetchRequest
来检索特定组的已提交偏移量 . 如果使用OffsetFetchRequest_v3
,则可以为主题参数传递None
,以获取该组已存储偏移量的所有主题/分区的偏移量 .例如:
如果
mygroup
已为topic
和topic2
提交了偏移量,它将打印如下内容: