我正在使用C kafka lib来生成/使用kafka消息 - 一切都很好 .
现在我想监视我的消费者以处理断线/故障 . 我正在寻找所有消费者的连接列表 .
来自kafka documentation:
Consumer Id Registry除了group_id由组中的所有使用者共享之外,每个使用者都被赋予一个瞬态的,唯一的consumer_id(主机名:uuid),用于识别目的 . 消费者ID在以下目录中注册 . / consumers / [group_id] / ids / [consumer_id] - > {“topic1”:#streams,...,“topicN”:#streams}(短暂节点)
组中的每个使用者都在其组下注册,并使用其consumer_id创建一个znode . znode的值包含<topic,#streams>的映射 . 该id仅用于标识组中当前活动的每个消费者 . 这是一个短暂的节点,因此如果消费者进程终止,它将消失 .
但是当试图 ls /consumers
时,那里什么都没有(我的应用程序启动并运行,消费者通过日志消费消息)
2 回答
在kafka 0.9集群中,只有
old-consumers(zookeeper-based offset storage)
在"/consumers/[group_id]/ids/[consumer_id]"下注册,new-consumer(kafka-based offset storage)
未在"/consumers/[group_id]"路径下注册 . (我的群集版本是kafka 0.9.0.1)librdkafka高级KafkaConsumer依赖于Apache Kafka 0.9中新的基于代理的 balancer 消费者组,他们根本不使用ZooKeeper .
您可以使用Kafka分发中的
bin/kafka-consumer-groups.sh --new-consumer ..
脚本列出和描述已注册的使用者组 . 还有一个API可以在librdkafka的C接口(rd_kafka_list_groups()
)中以编程方式获取相同的信息,并且很快就会在C中提供 .