我想向Kafka服务器发送消息 . 代理列表不包括localhost . 但是当产生调用send方法时,它有异常: 生产环境 者与localhost的连接:9092不成功
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
// create a producer
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.1.203:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
//sending...
String topic = "data_collect_events";
String message = "_Message_1";
KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, message);
producer.send(keyedMessage);
例外:
ERROR [main] 2013-07-23 19:27:10,580 kafka.utils.Logging$class: Producer connection to localhost:9092 unsuccessful
java.net.ConnectException: Connection refused: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:364)
at sun.nio.ch.Net.connect(Net.java:356)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:623)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at enter code herekafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
at kafka.producer.Producer.send(Producer.scala:74)
at kafka.javaapi.producer.Producer.send(Producer.scala:32)
3 回答
在kafka 0.8中,代理列表仅用于检索元数据 . 然后, 生产环境 者使用返回的元数据信息连接到代理 . 元数据中的主机名取决于主机的OS设置(请参阅https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F) .
您可以在server.properties中设置host.name . 像这样
很可能你正在使用Kafka 0.7配置密钥用于Kafka 0.8,所以
如果没有指定代理,则简单地忽略并且 生产环境 者默认为localhost:9092 .
You have to use
broker.list
而不是meta.broker.list
.我有一个类似的问题,在EC2之外的Kafka制作人无法按照Zookeeper的预期解析内部EC2 IP .
我编辑了
/etc/hosts
文件,为 生产环境 者添加了一个条目:public-ip
&internal-ec2-ip
,以便与Kafka经纪人交谈 .