首页 文章

无法使用Flink CLI将流部署到Apache Flink的HA集群

提问于
浏览
1

我可以将流程部署到Apache Flink的独立安装(使用一个JobManager和几个TaskManagers)而不会出现问题:

bin/flink run -m example-app-1.stag.local:6123 -d -p 4 my-flow-fat-jar.jar <flow parameters>

但是当我运行相同的命令并部署到独立HA群集时,此命令会引发错误:

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
    at org.apache.flink.client.program.Client.runDetached(Client.java:406)
    at org.apache.flink.client.program.Client.runDetached(Client.java:366)
    at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
    at org.apache.flink.client.program.Client.runDetached(Client.java:278)
    at org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:844)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:330)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
    at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:221)
    at org.apache.flink.client.program.Client.runDetached(Client.java:403)
    ... 7 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at scala.concurrent.Await.result(package.scala)
    at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:218)
    ... 8 more

活动作业管理器将以下错误写入日志:

2016-04-14 13:54:44,160 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@127.0.0.1:62784] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2016-04-14 13:54:46,299 WARN  org.apache.flink.runtime.jobmanager.JobManager                - Discard message LeaderSessionMessage(null,TriggerSavepoint(5de582462f334caee4733c60c6d69fd7)) because the expected leader session ID Some(72630119-fd0a-40e7-8372-45c93781e99f) did not equal the received leader session ID None.

所以,我不明白什么会导致这样的错误?

如果需要其他信息,请告诉我 .

附:

从Flink Dashboard部署适用于独立HA群集 . 当我仅通过Flink CLI进行部署时会出现此类问题 .

Update

我清除Zookeeper,清除Flink在磁盘上使用的目录并重新部署Flink Standalone HA集群 . 然后我尝试运行流程使用 bin/flink run 命令 . 正如您所看到的,JobManager只写了一行关于问题的内容(请参阅flink - jobmanager-0-example-app-1.stag.local.log) .

所有JobManagers和TaskManagers都使用相同的 flink-conf.yaml

jobmanager.heap.mb: 1024
jobmanager.web.port: 8081

taskmanager.data.port: 6121
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
taskmanager.tmp.dirs: /flink/data/task_manager

blob.server.port: 6130
blob.storage.directory: /flink/data/blob_storage

parallelism.default: 4

state.backend: filesystem
state.backend.fs.checkpointdir: s3a://example-flink/checkpoints

restart-strategy: none
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 60s

recovery.mode: zookeeper
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181
recovery.zookeeper.path.root: /example/flink
recovery.zookeeper.storageDir: s3a://example-flink/recovery
recovery.jobmanager.port: 6123

fs.hdfs.hadoopconf: /flink/conf

因此,似乎正确配置了独立HA群集 .

Update 2

仅供参考:我想按here所述安装独立HA群集 . 不是YARN HA集群 .

Update 3

这是由 bin/flink CLI创建的日志:flink-username-client-hostname.local.log .

1 回答

  • 2

    在HA模式下启动Flink集群时, JobManager 地址及其前导ID将写入指定的ZooKeeper集群 . 为了与 JobManager 进行通信,您不仅要知道地址,还要知道其领导地址 . 因此,您必须在CLI中读取的'flink-conf.yaml`中指定以下参数 .

    recovery.mode: zookeeper
    recovery.zookeeper.quorum: address of your cluster
    recovery.zookeeper.path.root: ZK path you've started your cluster with
    

    有了这些信息,客户端就知道它可以在哪里找到ZooKeeper集群以及在哪里找到 JobManager 地址及其领导者ID .

相关问题