首页 文章

Apache Flink:逐步执行

提问于
浏览
3

由于性能测量,我想逐步执行为Flink编写的Scala程序,即

execute first operator; materialize result;
execute second operator; materialize result;
...

等等 . 原始代码:

var filename = new String("<filename>")
var text = env.readTextFile(filename)
var counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts.writeAsText("file://result.txt", WriteMode.OVERWRITE)
env.execute()

所以我希望 var counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1) 的执行是逐步的 .

在每个操作员正确的方式之后调用 env.execute() 吗?

或者在每次操作后写入 /dev/null ,即调用 counts.writeAsText("file:///home/username/dev/null", WriteMode.OVERWRITE) 然后调用 env.execute() 更好的选择?为此,Flink实际上有类似_1679380的东西吗?

edit: 我正在群集上使用Flink Scala Shell,并使用parallelism = 1设置应用程序以执行上述代码 .

1 回答

  • 3

    Flink默认使用流水线数据传输来提高作业执行的性能 . 但是,您也可以通过调用强制批量数据传输

    ExecutionEnvironment env = ...
    env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
    

    这将分离两个运算符的执行(除非它们被链接) . 您可以从日志文件中获取每个任务的执行时间,也可以查看Web仪表板 . 注意,这对于链式运算符(即具有相同并行性且不需要网络混洗的运算符)不起作用 . 此外,您应该知道使用批量传输会增加程序的总体执行时间 . 我认为不可能真正区分流水线数据处理器中运算符的执行时间 .

    在每个运算符不起作用之后调用 execute() 因为,Flink还不支持在内存中缓存结果 . 因此,如果执行运算符2,您将需要将运算符1的结果写入某个持久存储并再次读取它或再次执行运算符1 .

相关问题