首页 文章

Apache Flink是否将所有添加功能的状态序列化为流?甚至在本地部署?

提问于
浏览
0

我正在为测试用例编写一些函数,它们被注入一些对象的共享引用;假设 SinkFunction 像这样:

class Collector[T](collection: ListBuffer[T]) extends SinkFunction[T] {
    override def invoke(in: T, context: SinkFunction.Context[_]): Unit = {
        collection.append(in)
    }
}

和测试代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = ListBuffer.empty[String]
env.fromElements("Hello").addSink(new Collector(list))
env.execute()
println(list)

我运行我的测试,但在测试结束时 list 是空的!我检查文档(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)我发现原始示例使用单例引用 .

所以我要确定Apache Flink如何在内部工作:即使在本地部署中,它是否将所有添加的功能序列化到流中?

1 回答

  • 0

    是的,Flink序列化了所有功能 . 例如,如果你看一下SinkFunction,你会注意到 implements Serializable .

    如果要在作业和客户端(将作业发送到Flink的程序)之间共享数据,则必须使用文件,套接字,消息传递(RMQ,Kafka)或类似机制自行管理 .

相关问题