首页 文章

使用Kafka java producer发送消息: 生产环境 者连接到localhost:9092不成功

提问于
浏览
3

我想向Kafka服务器发送消息 . 代理列表不包括localhost . 但是当产生调用send方法时,它有异常: 生产环境 者与localhost的连接:9092不成功

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

// create a producer

Properties props = new Properties();

props.put("metadata.broker.list", "192.168.1.203:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

producer = new Producer<String, String>(config);

//sending...
String topic = "data_collect_events";
String message = "_Message_1";
KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, message);
producer.send(keyedMessage);

例外:

ERROR [main] 2013-07-23 19:27:10,580 kafka.utils.Logging$class: Producer connection to localhost:9092 unsuccessful
java.net.ConnectException: Connection refused: connect
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:364)
    at sun.nio.ch.Net.connect(Net.java:356)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:623)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
    at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
    at scala.collection.Iterator$class.foreach(Iterator.scala:631)
    at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
    at enter code herekafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
    at kafka.producer.Producer.send(Producer.scala:74)
    at kafka.javaapi.producer.Producer.send(Producer.scala:32)

3 回答

  • 4

    在kafka 0.8中,代理列表仅用于检索元数据 . 然后, 生产环境 者使用返回的元数据信息连接到代理 . 元数据中的主机名取决于主机的OS设置(请参阅https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F) .

    您可以在server.properties中设置host.name . 像这样

    host.name=192.168.1.203
    
  • 0

    很可能你正在使用Kafka 0.7配置密钥用于Kafka 0.8,所以

    props.put("metadata.broker.list", "192.168.1.203:9092");
    

    如果没有指定代理,则简单地忽略并且 生产环境 者默认为localhost:9092 .
    You have to use broker.list 而不是 meta.broker.list .

  • 4

    我有一个类似的问题,在EC2之外的Kafka制作人无法按照Zookeeper的预期解析内部EC2 IP .

    我编辑了 /etc/hosts 文件,为 生产环境 者添加了一个条目: public-ipinternal-ec2-ip ,以便与Kafka经纪人交谈 .

相关问题