我正在尝试为第三方的Kafka和ZooKeeper服务器编写Java客户端 . 我能够列出并描述主题,但是当我尝试阅读任何主题时,会引发 ClosedChannelException
. 我使用命令行客户端在这里重现它们 .
$ bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic
[2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2015-06-02 16:23:04,515] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:0,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
备用命令成功:
$ bin/kafka-topics.sh --describe --zookeeper 255.255.255.255:2181 --topic eventbustopic
Topic:eventbustopic PartitionCount:2 ReplicationFactor:1 Configs:
Topic: eventbustopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: eventbustopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
$ bin/kafka-topics.sh --list --zookeeper 255.255.255.255:2181 --topic eventbustopic
eventbustopic
(ips被编辑并替换为255.255.255.255)
当我谷歌这个例外时,我看到 生产环境 者方面的问题 - 事实上, ClientUtils.fetchTopicMetadata
的来源表明这主要是由 生产环境 者使用 .
我担心的一个问题是,这可能是网络布局的一个产物:数据包被Haproxy破坏并通过VPN发送 .
究竟什么在这里工作?
4 回答
代理告诉客户端应该使用哪个主机名来生成/使用消息 . 默认情况下,Kafka使用其运行的系统的主机名 . 如果客户端无法解析此主机名,则会出现此异常 .
您可以尝试将Kafka配置中的
advertised.host.name
设置为客户端应使用的主机名/地址 .这是我解决这个问题的方法:
运行
bin/kafka-server-stop.sh
以停止运行kafka服务器 .通过添加一行修改属性文件
config/server.properties
:listeners=PLAINTEXT://{ip.of.your.kafka.server}:9092
重启kafka服务器 .
由于没有lisener设置,kafka将使用
java.net.InetAddress.getCanonicalHostName()
来获取套接字服务器侦听的地址 .你有Zookeeper的问题 .
255.255.255.255:2181
不是有效的Zookeeper地址;这是您网络上的广播地址或子网掩码 . 要使工作正常,请找到运行Zookeeper的计算机的IP地址或主机名 .在AWS上遇到此错误 . 问题是我对安全组过于严格,并将端口2181和9092设置为“我的IP” . 这意味着kafka实例找不到在同一个盒子上运行的ZK .
解决方案 - 打开它 - 一点点 .