我试图使用Kafka java API消费消息 . 能够使用kafka-console-consumer.bat消费消息 . 但是,不能消耗来自java api的消息 . 没有收到任何错误或任何消息 . 帮我搞错了 .
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class SimpleConsumer {
private final ConsumerConnector consumer;
private final String topic;
public SimpleConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("Message from Single Topic: " + new String(it.next().message()));
}
}
if (consumer != null) {
consumer.shutdown();
}
}
public static void main(String[] args) {
String topic = "test";
SimpleConsumer simpleHLConsumer = new SimpleConsumer("localhost:2181", "testgroup", topic);
simpleHLConsumer.testConsumer();
}
}
CONSOLE COMMAND
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning
已创建主题 kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test . 使用 kafka-console-producer.bat --broker-list localhost:9092 --topic test 发布消息,如果我运行消费者程序,则登录Broker console [2015-12-29 11:57:34,448] INFO Closing socket connection to /IP (kafka.network.Processor) .
如果我关闭程序获取此日志** java.io.IOException:远程主机**强制关闭现有连接 . 请帮助我为什么我不能消费来自上述程序的消息 .
但是,能够使用 kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning 消费消息 . 帮我
1 回答
如果要从头开始阅读消息,则需要设置选项
它默认是“最大的” .
http://kafka.apache.org/documentation.html
注意:此选项适用于新的消费者API(自0.9.0.0起):
您的group.id已在zookeeper中保存了偏移量 . 所以,如果你想看到消息改变group.id或清理zookeeper