首页 文章

使用Storm 0.10.x(KafkaSpout)从Kafka 0.10.x主题消费

提问于
浏览
1

我不确定这个论坛是否是一个正确的问题 . 我们使用Storm KafkaSpout连接器消耗了Storm的Kafka主题 . 到目前为止工作正常 . 现在我们应该连接到一个新的Kafka集群,该集群的升级版本 0.10.x 来自同一个版本 0.10.x 的Storm env .

从风暴文档(http://storm.apache.org/releases/1.1.0/storm-kafka-client.html)我可以看到风暴 1.1.0 与支持新Kafka消费者API的Kafka 0.10.x 兼容 . 但在这种情况下,我将无法在最终运行拓扑(如果我错了,请纠正我) .

这有什么工作吗?我已经看到,即使New Kafka Consumer API已经删除了ZooKeeper依赖项,但我们仍然可以通过传递 --zookeeper 标志而不是新的 –bootstrap-server 标志(推荐)使用旧的 Kafka-console-consumer.sh 消耗来自它的消息 . 我使用Kafka 0.9运行此命令,并能够使用Kafka 0.10.x上托管的主题

当我们尝试连接时获得以下异常:

java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/mytopic/partitions
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[stormjar.jar:?]
at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[stormjar.jar:?]

但是我们能够连接到远程ZK服务器并验证路径是否存在:

./zkCli.sh -server remoteZKServer:2181

      [zk: remoteZKServer:2181(CONNECTED) 5] ls /brokers/topics/mytopic/partitions
      [3, 2, 1, 0]

正如我们在上面所看到的那样,它给出了我们预期的输出,因为主题中有4个分区 .

此时有以下问题:

1)是否可以使用Storm版本 0.10.x 连接到Kafka 0.10.x ?有人试过吗?

2)即使我们能够使用,我们还是需要进行任何代码更改,以便在拓扑关闭/重启时检索消息偏移 . 我问这个,因为我们将传递Zk集群细节而不是旧KafkaSpout版本支持的代理信息 .

这里的选项用完了,任何指针都会受到高度赞赏

UPDATE:
我们能够在使用eclipse在本地运行时从远程Kafka主题连接和使用 . 为了确保storm不使用内存中的zk,我们使用了重载的构造函数 LocalCluster("zkServer",port) ,它工作正常,我们可以看到数据即将到来 . 这使我们得出结论,版本兼容性可能不是这里的问题 .

但是在群集中部署拓扑时仍然没有运气 . 我们已经验证了从暴风箱到zkservers的连接性znode似乎也很好..

在这一点上真的需要一些指针,这可能是错误的,我们如何调试呢?从来没有和Kafka 0.10x合作过,所以不确定我们到底错过了什么 .

真的很感激一些帮助和建议

1 回答

  • 1

    Storm 0.10x与Kafka 0.10x兼容 . 我们仍然可以使用依赖于zookeeper的偏移存储机制的旧 KafkaSpout .

    连接丢失异常即将到来,因为我们试图到达一个不允许/接受来自我们端的连接的远程Kafka集群 . 我们需要打开特定的防火墙端口,以便 Build 连接 . 似乎运行拓扑是集群模式时,所有管理节点都应该能够与zookeeper通信,因此防火墙应该为每个节点打开 .

相关问题