首页 文章

Kafka&Flink在重启时重复发送消息

提问于
浏览
4

首先,这与Kafka consuming the latest message again when I rerun the Flink consumer非常相似,但它不一样 . 这个问题的答案似乎并没有解决我的问题 . 如果我错过了答案,那么请重新解释答案,因为我显然错过了一些东西 .

但问题完全相同--Flink(kafka连接器)重新运行它在关闭之前看到的最后3-9条消息 .

我的版本

Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91

我的代码

import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}

我的SBT依赖项

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)

我的过程

(3个终端)

TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic

我的期望

当系统中没有错误时,我希望能够在不重新处理先前运行中成功完成流的消息的情况下打开和关闭flink .

我的尝试修复

我已经添加了对 setStateBackend 的调用,认为可能默认的内存后端似乎没有帮助 .

我已经删除了对 enableCheckpointing 的调用,希望可能有一个单独的机制来跟踪Flink vs Zookeeper中的状态 . 这似乎没有帮助 .

我使用过不同的接收器,RollingFileSink,print();希望也许这个错误发生在 Kafka . 这似乎没有帮助 .

我已经回滚到flink(以及所有连接器)v1.1.0和v1.1.1,希望可能是最新版本的bug . 这似乎没有帮助 .

我已经将 zookeeper.connect 配置添加到属性对象中,希望关于它的注释仅在0.8中有用是错误的 . 这似乎没有帮助 .

我明确地将检查点模式设置为 EXACTLY_ONCE (好主意drfloob) . 这似乎没有帮助 .

我的辩护

救命!

2 回答

  • 1

    (我在JIRA中发布了相同的回复,只是在这里交叉发布)

    根据你的描述,我假设你手动关闭工作,然后重新提交,对吗?

    除非您使用保存点(https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html),否则Flink不会在手动重新启动时保留一次 . 完全一次保证是指作业失败时,然后自动从以前的检查点恢复(当启用检查点时,就像您使用env.enableCheckpointing(500)所做的那样)

    实际发生的是,当您手动重新提交作业时,Kafka消费者只是从ZK / Kafka中提交的现有偏移中开始阅读 . 这些抵消是在您第一次执行工作时致力于ZK / Kafka . 然而,它们并不用于Flink的一次性语义; Flink使用内部检查点Kafka偏移量 . Kafka 的消费者将这些补偿权交还给ZK,只是为了向外界揭示工作消费的进展情况(Flink) .

  • 8

    更新2:我使用偏移处理修复了错误,它已合并到当前的MASTER中 .

    更新:不是问题,在取消工作前使用手动保存点(感谢Gordon)

    我检查了日志,这似乎是偏移处理中的一个错误 . 我在https://issues.apache.org/jira/browse/FLINK-4618下提交了一份报告 . 我得到反馈后会更新这个答案 .

相关问题