首页 文章

如何从kafka服务器获取主题中的所有消息

提问于
浏览
1

我想从服务器的主题开始收到所有消息 .

例如:

bin / kafka-console-consumer.sh --zookeeper localhost:2181 - topic testTopic --from-beginning

使用上面的控制台命令时,我希望能够从一开始就获取主题中的所有消息,但是从使用java代码开始,我无法使用主题中的所有消息 .

4 回答

  • 0

    最简单的方法是启动消费者并消耗所有消息 . 现在我不知道您的主题中有多少分区,以及您是否已经拥有现有的消费者群组,但您有以下几种选择:

    看看这个API:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

    1)如果您已在同一个使用者组中拥有使用者,但仍希望从头开始使用,则应使用API文档中列出的 seek 选项,并为该组中的每个使用者设置偏移量为0 . 这将从一开始就开始消耗 .

    2)否则,您可以在新的消费者群体中启动一些消费者,您不必担心寻求 .

    PS:如果您对Kafka有更多疑问,请记得在将来提供有关您的设置的更多详细信息 . 很多事情取决于您如何配置您的基础架构以及您希望它如何,因此会因具体情况而异 .

  • 4

    TopicPartition topicPartition = new TopicPartition(topic,0); List partitions = Arrays.asList(topicPartition); consumer.assign(分区); consumer.seekToBeginning(分区);

  • 0

    只需更改消费者群组即可

    ConsumerConfig.GROUP_ID_CONFIG - 到新的组ID

    并设定

    AUTO_OFFSET_RESET_CONFIG - 最早的

    样本代码 -

    props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
  • 1

    您可以使用以下命令获取所有消息:

    cd Users/kv/kafka/bin
    
    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning --max-messages 100
    

相关问题