我想从服务器的主题开始收到所有消息 .
例如:
bin / kafka-console-consumer.sh --zookeeper localhost:2181 - topic testTopic --from-beginning
使用上面的控制台命令时,我希望能够从一开始就获取主题中的所有消息,但是从使用java代码开始,我无法使用主题中的所有消息 .
最简单的方法是启动消费者并消耗所有消息 . 现在我不知道您的主题中有多少分区,以及您是否已经拥有现有的消费者群组,但您有以下几种选择:
看看这个API:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
1)如果您已在同一个使用者组中拥有使用者,但仍希望从头开始使用,则应使用API文档中列出的 seek 选项,并为该组中的每个使用者设置偏移量为0 . 这将从一开始就开始消耗 .
seek
2)否则,您可以在新的消费者群体中启动一些消费者,您不必担心寻求 .
PS:如果您对Kafka有更多疑问,请记得在将来提供有关您的设置的更多详细信息 . 很多事情取决于您如何配置您的基础架构以及您希望它如何,因此会因具体情况而异 .
TopicPartition topicPartition = new TopicPartition(topic,0); List partitions = Arrays.asList(topicPartition); consumer.assign(分区); consumer.seekToBeginning(分区);
只需更改消费者群组即可
ConsumerConfig.GROUP_ID_CONFIG - 到新的组ID
并设定
AUTO_OFFSET_RESET_CONFIG - 最早的
样本代码 -
props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
您可以使用以下命令获取所有消息:
cd Users/kv/kafka/bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning --max-messages 100
4 回答
最简单的方法是启动消费者并消耗所有消息 . 现在我不知道您的主题中有多少分区,以及您是否已经拥有现有的消费者群组,但您有以下几种选择:
看看这个API:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
1)如果您已在同一个使用者组中拥有使用者,但仍希望从头开始使用,则应使用API文档中列出的
seek
选项,并为该组中的每个使用者设置偏移量为0 . 这将从一开始就开始消耗 .2)否则,您可以在新的消费者群体中启动一些消费者,您不必担心寻求 .
PS:如果您对Kafka有更多疑问,请记得在将来提供有关您的设置的更多详细信息 . 很多事情取决于您如何配置您的基础架构以及您希望它如何,因此会因具体情况而异 .
TopicPartition topicPartition = new TopicPartition(topic,0); List partitions = Arrays.asList(topicPartition); consumer.assign(分区); consumer.seekToBeginning(分区);
只需更改消费者群组即可
并设定
样本代码 -
您可以使用以下命令获取所有消息: