我的spring boot项目有一个演示Kafka Streams API的应用程序 . 我可以使用该命令使用主题 customer
中的所有消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic customer --from-beginning
Kafka Streams API中使用KStream或KTable消息的类似命令是什么?我试过了
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); properties.put("auto.offset.reset", "earliest");
两者都没用 . 我确实用 KafkaConsumer
而不是Streams创建了一个测试用例,它没有用 . 代码上传至Github以供参考 . 任何帮助都会很棒 .
1 回答
bin/kafka-streams-application-reset.sh
工具允许从v1.1开始寻找 .参看https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application