我写了一个非常简单的Flink流媒体作业,它使用 FlinkKafkaConsumer082
从Kafka获取数据 .
protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
Properties result = new Properties();
result.put("bootstrap.servers", getBrokerUrl());
result.put("zookeeper.connect", getZookeeperUrl());
result.put("group.id", getGroup());
return env.addSource(
new FlinkKafkaConsumer082<>(
topic,
new SimpleStringSchema(), result);
}
这非常有效,每当我在Kafka的主题中加入某些内容时,我的Flink工作就会收到它并进行处理 . 现在我试着看看如果我的Flink Job由于某种原因不在线会发生什么 . 所以我关闭了flink工作并继续向Kafka发送消息 . 然后我又开始了我的Flink工作,并期望它会处理同时发送的消息 .
但是,我收到了这条消息:
No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]
因此它基本上忽略了自上次关闭Flink作业以来刚刚开始在队列末尾读取的所有消息 . 从我收集的 FlinkKafkaConsumer082
的文档中,它自动处理与Kafka代理同步已处理的偏移 . 然而,情况似乎并非如此 .
我正在使用单节点Kafka安装(Kafka发行版附带的安装)与单节点Zookeper安装(也是与Kafka发行版捆绑的安装) .
我怀疑它是某种错误配置或类似的东西,但我真的不知道从哪里开始寻找 . 有没有其他人有这个问题,也许解决了吗?
2 回答
我找到了原因 . 您需要在
StreamExecutionEnvironment
中明确启用检查点,以使Kafka连接器将已处理的偏移量写入Zookeeper . 如果不启用它,Kafka连接器将不会写入最后读取的偏移量,因此在重新启动收集作业时无法从那里恢复 . 所以一定要写:如果由于某种原因检查点失败,Anatoly建议更改初始偏移可能仍然是一个好主意 .
https://kafka.apache.org/08/configuration.html
将auto.offset.reset设置为最小值(默认情况下为最大值)
auto.offset.reset :
还要确保重启后getGroup()是相同的