首页 文章

kafka streams app - 重启时忽略旧消息

提问于
浏览
2

我处理实时应用程序的时间序列数据 . 所以旧数据没有意义 . 我只想处理流应用程序启动后收到的数据,而不是之前提交的偏移量 . 重启后忽略kafka流应用程序上的旧记录的正确方法是什么?

使用kafka使用者API我通常使用 seekToEnd() 方法跳转到最新记录 . 是否有流的等效机制?我想避免过滤自上次提交以来忽略旧消息的所有消息 .

1 回答

  • 0

    您可以使用Kafka Consumer API创建另一个使用者,其中 groupId 与kafka-stream的 applicationId 相同,并在启动流之前使用该使用者执行 seekToEnd() . 禁用此特殊使用者的autoCommit并在 seekToEnd() 之后手动提交偏移量 . 然后尝试启动您的流 .

    确保在重置消费者的偏移量提交之前,流尚未启动 .

相关问题