首页 文章

如何从read_committed Kafka Consumer获得最后一次提交的偏移量

提问于
浏览
5

我正在使用事务性KafkaProducer向主题发送消息 . 这很好用 . 我使用具有read_committed隔离级别的KafkaConsumer,我遇到了seek和seekToEnd方法的问题 . 根据文档,seek和seekToEnd方法给了我LSO(Last Stable Offset) . 但这有点令人困惑 . 因为它给我总是相同的值,主题的结尾 . 无论最后一个条目是由提交者提交还是由中止事务的一部分提交 . 例如,在我中止最后5次尝试插入20_000条消息后,消费者不应读取最后100_000条记录 . 但是在seekToEnd期间,它会移动到主题的末尾(包括100_000条消息) . 但是poll()并没有返回它们 .

我正在寻找一种方法来检索Last Committed Offset(生成器最后一次成功提交的消息) . 似乎没有适当的API方法 . 所以我需要自己动手吗?

选项是返回并轮询直到不再检索到记录,这将导致最后提交的消息 . 但我认为 Kafka 提供了这种方法 .

我们使用Kafka 1.0.0 .

1 回答

  • 0

    KafkaConsumer 有一些很好的方法,如: partitionForbegginingOffsetsendOffsetscommitedposition .

    检查哪一个符合您的需求 . 特别仔细考虑所有4种与偏移相关的方法 . 方法 partitionFor 返回包含其他信息的完整元数据对象,但对于丰富日志记录非常有用 .

相关问题