我不确定这个论坛是否是一个正确的问题 . 我们使用Storm KafkaSpout连接器消耗了Storm的Kafka主题 . 到目前为止工作正常 . 现在我们应该连接到一个新的Kafka集群,该集群的升级版本 0.10.x
来自同一个版本 0.10.x
的Storm env .
从风暴文档(http://storm.apache.org/releases/1.1.0/storm-kafka-client.html)我可以看到风暴 1.1.0
与支持新Kafka消费者API的Kafka 0.10.x
兼容 . 但在这种情况下,我将无法在最终运行拓扑(如果我错了,请纠正我) .
这有什么工作吗?我已经看到,即使New Kafka Consumer API已经删除了ZooKeeper依赖项,但我们仍然可以通过传递 --zookeeper
标志而不是新的 –bootstrap-server
标志(推荐)使用旧的 Kafka-console-consumer.sh
消耗来自它的消息 . 我使用Kafka 0.9运行此命令,并能够使用Kafka 0.10.x上托管的主题
当我们尝试连接时获得以下异常:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/mytopic/partitions
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[stormjar.jar:?]
at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[stormjar.jar:?]
但是我们能够连接到远程ZK服务器并验证路径是否存在:
./zkCli.sh -server remoteZKServer:2181
[zk: remoteZKServer:2181(CONNECTED) 5] ls /brokers/topics/mytopic/partitions
[3, 2, 1, 0]
正如我们在上面所看到的那样,它给出了我们预期的输出,因为主题中有4个分区 .
此时有以下问题:
1)是否可以使用Storm版本 0.10.x
连接到Kafka 0.10.x
?有人试过吗?
2)即使我们能够使用,我们还是需要进行任何代码更改,以便在拓扑关闭/重启时检索消息偏移 . 我问这个,因为我们将传递Zk集群细节而不是旧KafkaSpout版本支持的代理信息 .
这里的选项用完了,任何指针都会受到高度赞赏
UPDATE:
我们能够在使用eclipse在本地运行时从远程Kafka主题连接和使用 . 为了确保storm不使用内存中的zk,我们使用了重载的构造函数 LocalCluster("zkServer",port)
,它工作正常,我们可以看到数据即将到来 . 这使我们得出结论,版本兼容性可能不是这里的问题 .
但是在群集中部署拓扑时仍然没有运气 . 我们已经验证了从暴风箱到zkservers的连接性znode似乎也很好..
在这一点上真的需要一些指针,这可能是错误的,我们如何调试呢?从来没有和Kafka 0.10x合作过,所以不确定我们到底错过了什么 .
真的很感激一些帮助和建议
1 回答
Storm 0.10x与Kafka 0.10x兼容 . 我们仍然可以使用依赖于zookeeper的偏移存储机制的旧
KafkaSpout
.连接丢失异常即将到来,因为我们试图到达一个不允许/接受来自我们端的连接的远程Kafka集群 . 我们需要打开特定的防火墙端口,以便 Build 连接 . 似乎运行拓扑是集群模式时,所有管理节点都应该能够与zookeeper通信,因此防火墙应该为每个节点打开 .