首页 文章

有关Flink外部检查点的两个问题

提问于
浏览
3

关于Flink外化检查点,我有两个问题

(Q1)我可以在flink-conf.yaml中设置"state.checkpoints.dir"以使外部化检查点正常工作,但是当我从IDE运行flink时如何实现相同的功能呢?我尝试了(http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html)中提到的GlobalConfiguration方法,但没有运气 . 我就这样做了:

Configuration cfg =
                GlobalConfiguration.loadConfiguration();
cfg.setString("state.checkpoints.dir", "file:///tmp/checkpoints/state");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

这是IDE中显示的错误信息:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ef7050e2308a4787d983d80f3c07f55c (Long Taxi Rides (checkpointed))
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:211)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:478)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:291)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
    ... 19 more

Process finished with exit code 1

(Q2)在检查站的文件(https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html)中,它显示"This way, you will have a checkpoint around to resume from if your job fails.",被取消的工作怎么样?新工作是否会继续使用现有检查点,还是从自己的检查点开始?

3 回答

  • 2

    您可以控制在取消作业时是否删除外部化检查点 . 如果要保留它们,可以执行以下操作:

    CheckpointConfig config = env.getCheckpointConfig();
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

    有关详细信息,请参阅the docs .

    resume from an externalized checkpoint执行此操作(与从保存点恢复相同):

    $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
    
  • 1

    从Eclipse设置检查点目录,通常我只是在设置要使用的后端时执行此操作,例如,

    env.setStateBackend(new FsStateBackend(options.getCheckpointDir()));
    

    重新取消的作业 - 检查点目录被删除 . 如果要在停止(取消)作业后从已知状态恢复,则需要执行保存点 .

  • 1

    第一个问题,使用自定义配置创建本地环境:

    val conf = new Configuration()
    conf.setString(CoreOptions.CHECKPOINTS_DIRECTORY, "file:///user/flink/checkpoint/storing/")
    StreamExecutionEnvironment.createLocalEnvironment(4, conf)
    

    第二个问题,正如大卫所说:

    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

相关问题