首页 文章

从Flink的kafka主题开始消费

提问于
浏览
2

How do I make sure I always consume from the beginning of a Kafka topic with Flink?

由于Kafka 0.9.x consumer是Flink 1.0.2的一部分,它似乎不再是Kafka而是Flink来控制偏移量:

Flink在内部将偏移量作为其分布式检查点的一部分进行快照 . 提交给Kafka / ZooKeeper的抵消只是为了让外部的进展观与Flink的进展观同步 . 这样,监控和其他工作可以了解Flink Kafka消费者在多大程度上消耗了一个主题 .

这是我得到了多远,但我的Flink程序始终从它停止的地方开始,并且不会返回到开头,因为配置指示它:

val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myflinkservice")
props.setProperty("auto.offset.reset", "earliest")

val incomingData = env.addSource(
  new FlinkKafkaConsumer09[IncomingDataRecord](
    "my.topic.name",
    new IncomingDataSchema,
    props
  )
)

1 回答

  • 0

    我认为你可以通过指定随机_1679248来解决这个问题:

    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("group.id", s"myflinkservice_${UUID.randomUUID}")
    props.setProperty("auto.offset.reset", "smallest") // "smallest", not "earliest"
    

    auto.offset.reset 仅在ZooKeeper中没有可用的初始偏移时才有效 .

相关问题