首页 文章

Spring kafka消费者,在运行时寻求偏移?

提问于
浏览
4

我正在使用KafkaMessageListenerContainer来消费kafka主题,我有一个应用程序逻辑来处理依赖于其他微服务的每个记录 . 我现在在处理每个记录后手动提交偏移量 .

但是,如果我的应用程序逻辑失败,我需要寻找失败的偏移并继续处理它,直到它成功 . 为此我需要做一个运行时手动搜索最后一个偏移量 .

这可能与KafkaMessageListenerContainer有关吗?

1 回答

  • 3

    Seeking to a Specific Offset .

    为了寻找,你的监听器必须实现具有以下方法的ConsumerSeekAware:void registerSeekCallback(ConsumerSeekCallback callback); void onPartitionsAssigned(Map <TopicPartition,Long> assignments,ConsumerSeekCallback callback); void onIdleContainer(Map <TopicPartition,Long> assignments,ConsumerSeekCallback回调);第一个是在容器启动时调用的;在初始化后的某个任意时间寻找时,应该使用此回调 . 您应该保存对回调的引用;如果您在多个容器(或ConcurrentMessageListenerContainer)中使用相同的侦听器,则应将回调存储在ThreadLocal或由侦听器Thread键入的其他一些结构中 .

相关问题