我正在尝试使用3个代理和zookeeper来测试运行单个Kafka节点 . 我希望使用控制台工具进行测试 . 我这样运行 生产环境 者:
kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9094 --topic testTopic
然后我这样运行消费者:
kafka-console-consumer --zookeeper localhost:2181 --topic testTopic --from-beginning
我可以按照预期在 生产环境 者中输入消息并在消费者中查看消息 . However ,当我使用bootstrap-server运行使用者的更新版本时,我什么都没得到 . 例如
kafka-console-consumer --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic testTopic --from-beginning
当我在端口9092上运行一个代理时,这很好用,所以我很困惑 . 有没有办法可以看到zookeeper提供什么作为引导程序服务器?引导服务器与代理列表不同吗? Kafka使用Scala 2.11编译 .
3 回答
我不知道出了什么问题 . 可能我把Kafka或Zookeeper置于一种奇怪的状态 . 删除每个代理的
log.dir
中的主题和/brokers/topics
中的zookeeper主题然后重新创建主题后,Kafka消费者的行为与预期一致 .引导服务器与kafka代理相同 . 如果您想查看zookeeper提供的引导服务器列表,您可以通过任何ZK客户端查询ZNode信息 . 所有活跃的经纪人都在/ brokers / ids / [brokerId]下注册 . 您只需要zkQuorum地址 . 下面的命令将为您提供活动引导服务器的列表:
./zookeeper-shell.sh localhost:2181 <<<“ls / brokers / ids”
我在使用不匹配的版本时遇到了同样的问题:
Kafka客户端库
Kafka脚本
Kafka 经纪人
在我的确切场景中,我使用Confluent Kafka客户端库版本0.10.2.1与Confluent Platform 3.3.0 w / Kafka broker 0.11.0.0 . 当我将Confluent Platform降级到与我的客户端库匹配的3.3.2时,消费者按预期工作 .
我的理论是,使用新的Consumer API的最新kafka-console-consumer只使用最新格式检索消息 . Kafka 0.11.0.0中引入了许多消息格式更改 .