首页 文章

Flink - 查询Kafka主题以获取消费者群体的偏移量?

提问于
浏览
1

Question: How can I query a kafka topic for the offset of a specific consumer group inside of flink code? (And side question (will make a new question on here if I need to ). How, if possible, can I get a timestamp of that offset?

(我发现有一些cli工具可以查询它,但这不是我想要的,因为它不是在flink工作中以编程方式完成的)

关于完整问题的一些额外背景,但我不想让这个太开放 .

我有一个用例,数据将从kafkaTopic1流入程序(让我们称之为P1),处理,然后持久化到数据库 . P1将位于多节点集群上,因此每个节点将处理多个kafka分区(假设该主题有5个节点和50个kafka分区) . 如果其中一个节点因任何原因完全失败并且正在处理数据,则该数据将丢失 .

例如,如果kafkaTopic1上有500条消息,并且node2已经提取了10条消息(因此根据偏移量提取的下一条消息是消息11),但是当节点发生故障时,其中只有8条已完全处理并持久保存到数据库中,仍在处理的2将丢失 . 当节点重新启动时,它将从消息11开始读取,跳过两个丢失的消息(从技术上讲,kafka分区将开始将其消息发送到另一个节点进行处理,以便该分区的偏移量会移动,我们不会必须确切知道当节点死亡时接下来要处理的消息是什么 .

(注意:当节点死亡时,假设用户注意并完全关闭P1,因此暂时不再处理数据) .

所以这就是flink发挥作用的地方 . 我想创建一个flink作业,可以通过用户的参数告诉P1的消费者组,然后查询kafka主题(也由用户提供)以获取当前偏移量(OS1) . 然后,flink作业将kafkaTopic1的偏移量设置为OS1之前的X时间量(用户通过args提供的X)并开始从kafka主题中读取消息 . 然后它会将它读取的每条消息与数据库中的内容进行比较,如果它没有在数据库中找到消息,它会将其发送到另一个kafka主题(kafkaTopic2),以便在重新启动时由P1处理 .

1 回答

  • 1

    如果在flink作业中启用了检查点,那么您不应该丢失消息,因为flink也会在内部维护偏移量,并且在从故障中恢复后,它应该从上次提交的偏移量flink中读取 .

    现在,如果您仍然想要找到偏移量并从偏移量重新开始读取,那么它会变得棘手,因为您需要找到给定使用者组的给定主题的所有分区的偏移量 .

    我不知道如何从Flink-kafka-Consumer API中开箱即用,但您可以将kafka依赖项添加到项目中,并从Kafka API创建kafkaconsumer . 有消费者后,您可以致电

    consumer.position(partition)
    

    要么

    consumer.committed(partition)
    

    请注意,您仍需要遍历所有分区以获取所有当前偏移量

    阅读这里的差异:Kafka Javadoc

    获得要读取的偏移量后,可以使用以下内容手动指定flink作业中的使用者偏移量:

    Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
    specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
    specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
    specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
    
    myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
    

    有关Flink-kafka-Consumer的更多信息,请查看Flink Kafka Connector

相关问题