我正在为测试用例编写一些函数,它们被注入一些对象的共享引用;假设 SinkFunction
像这样:
class Collector[T](collection: ListBuffer[T]) extends SinkFunction[T] {
override def invoke(in: T, context: SinkFunction.Context[_]): Unit = {
collection.append(in)
}
}
和测试代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = ListBuffer.empty[String]
env.fromElements("Hello").addSink(new Collector(list))
env.execute()
println(list)
我运行我的测试,但在测试结束时 list
是空的!我检查文档(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)我发现原始示例使用单例引用 .
所以我要确定Apache Flink如何在内部工作:即使在本地部署中,它是否将所有添加的功能序列化到流中?
1 回答
是的,Flink序列化了所有功能 . 例如,如果你看一下SinkFunction,你会注意到
implements Serializable
.如果要在作业和客户端(将作业发送到Flink的程序)之间共享数据,则必须使用文件,套接字,消息传递(RMQ,Kafka)或类似机制自行管理 .