首页 文章

Flink Kafka:为什么我会丢失消息?

提问于
浏览
1

我写了一个非常简单的Flink流媒体作业,它使用 FlinkKafkaConsumer082 从Kafka获取数据 .

protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
    Properties result = new Properties();
    result.put("bootstrap.servers", getBrokerUrl());
    result.put("zookeeper.connect", getZookeeperUrl());
    result.put("group.id", getGroup());

        return env.addSource(
                new FlinkKafkaConsumer082<>(
                        topic,
                        new SimpleStringSchema(), result);
}

这非常有效,每当我在Kafka的主题中加入某些内容时,我的Flink工作就会收到它并进行处理 . 现在我试着看看如果我的Flink Job由于某种原因不在线会发生什么 . 所以我关闭了flink工作并继续向Kafka发送消息 . 然后我又开始了我的Flink工作,并期望它会处理同时发送的消息 .

但是,我收到了这条消息:

No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]

因此它基本上忽略了自上次关闭Flink作业以来刚刚开始在队列末尾读取的所有消息 . 从我收集的 FlinkKafkaConsumer082 的文档中,它自动处理与Kafka代理同步已处理的偏移 . 然而,情况似乎并非如此 .

我正在使用单节点Kafka安装(Kafka发行版附带的安装)与单节点Zookeper安装(也是与Kafka发行版捆绑的安装) .

我怀疑它是某种错误配置或类似的东西,但我真的不知道从哪里开始寻找 . 有没有其他人有这个问题,也许解决了吗?

2 回答

  • 2

    我找到了原因 . 您需要在 StreamExecutionEnvironment 中明确启用检查点,以使Kafka连接器将已处理的偏移量写入Zookeeper . 如果不启用它,Kafka连接器将不会写入最后读取的偏移量,因此在重新启动收集作业时无法从那里恢复 . 所以一定要写:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(); // <-- this is the important part
    

    如果由于某种原因检查点失败,Anatoly建议更改初始偏移可能仍然是一个好主意 .

  • 4

    https://kafka.apache.org/08/configuration.html

    将auto.offset.reset设置为最小值(默认情况下为最大值)

    auto.offset.reset

    当Zookeeper中没有初始偏移或偏移超出范围时该怎么办:最小:自动将偏移重置为最小偏移最大值:自动将偏移重置为最大偏移量:将异常抛给消费者 . 如果将此设置为最大,则当订阅的主题的分区数在代理上发生更改时,消费者可能会丢失一些消息 . 要防止分区添加期间数据丢失,请将auto.offset.reset设置为最小

    还要确保重启后getGroup()是相同的

相关问题