首页 文章

Kafka Java Consumer API问题

提问于
浏览
2

我试图使用Kafka java API消费消息 . 能够使用kafka-console-consumer.bat消费消息 . 但是,不能消耗来自java api的消息 . 没有收到任何错误或任何消息 . 帮我搞错了 .

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class SimpleConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public SimpleConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");

        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {
        Map<String, Integer> topicCount = new HashMap<String, Integer>();
        topicCount.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }
        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        String topic = "test";
        SimpleConsumer simpleHLConsumer = new SimpleConsumer("localhost:2181", "testgroup", topic);
        simpleHLConsumer.testConsumer();
    }
}

CONSOLE COMMAND

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning

已创建主题 kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test . 使用 kafka-console-producer.bat --broker-list localhost:9092 --topic test 发布消息,如果我运行消费者程序,则登录Broker console [2015-12-29 11:57:34,448] INFO Closing socket connection to /IP (kafka.network.Processor) .

如果我关闭程序获取此日志** java.io.IOException:远程主机**强制关闭现有连接 . 请帮助我为什么我不能消费来自上述程序的消息 .

但是,能够使用 kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning 消费消息 . 帮我

1 回答

  • 2

    如果要从头开始阅读消息,则需要设置选项

    auto.offset.reset=smallest
    

    它默认是“最大的” .

    http://kafka.apache.org/documentation.html

    当ZooKeeper中没有初始偏移或偏移超出范围时该怎么办:最小:自动将偏移重置为最小偏移最大值:自动将偏移重置为最大偏移量:将异常抛给消费者

    注意:此选项适用于新的消费者API(自0.9.0.0起):

    auto.offset.reset=earliest|latest|none
    

    您的group.id已在zookeeper中保存了偏移量 . 所以,如果你想看到消息改变group.id或清理zookeeper

相关问题