我在一个独立的集群中运行Spark . 与Master和2个工作节点在同一节点上的python驱动程序应用程序 . 业务逻辑是由在Worker节点上创建的执行程序运行的python代码 .

如果其中一个遗嘱执行人死亡,我最终会陷入困境 . 如果我强行杀死Worker 0上的一个后端进程,Master输出:

16/06/07 16:20:35 ERROR TaskSchedulerImpl: Lost executor 1 on sparkslave0: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
16/06/07 16:20:35 WARN TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, sparkslave0): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
16/06/07 16:20:35 INFO DAGScheduler: Executor lost: 1 (epoch 0)
16/06/07 16:20:35 INFO AppClient$ClientEndpoint: Executor updated: app-20160607161937-0010/1 is now EXITED (Command exited with code 137)
16/06/07 16:20:35 INFO SparkDeploySchedulerBackend: Executor app-20160607161937-0010/1 removed: Command exited with code 137

然后,超时后:

16/06/07 16:22:35 WARN NettyRpcEndpointRef: Error sending message [message = RemoveExecutor(1)] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:225)
    at org.apache.spark.storage.BlockManagerMaster.removeExecutor(BlockManagerMaster.scala:40)
    at org.apache.spark.scheduler.DAGScheduler.handleExecutorLost(DAGScheduler.scala:1321)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1628)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 9 more

我可以看到Worker守护进程创建了另一个执行程序,但没有任何反应 . 经过3次尝试,我明白了

16/06/07 16:26:41 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Error notifying standalone scheduler's driver endpoint
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:362)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.executorRemoved(SparkDeploySchedulerBackend.scala:144)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(AppClient.scala:186)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Error sending message [message = RemoveExecutor(1,Command exited with code 137)]
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:118)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.removeExecutor(CoarseGrainedSchedulerBackend.scala:359)
    ... 9 more
Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
    at scala.util.Try$.apply(Try.scala:161)
    at scala.util.Failure.recover(Try.scala:185)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643)
    at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658)
    at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
    at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634)
    at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    ... 3 more
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242)
    ... 7 more
16/06/07 16:26:41 ERROR DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed; shutting down SparkContext
org.apache.spark.SparkException: Error sending message [message = RemoveExecutor(1)]
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:118)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:225)
    at org.apache.spark.storage.BlockManagerMaster.removeExecutor(BlockManagerMaster.scala:40)
    at org.apache.spark.scheduler.DAGScheduler.handleExecutorLost(DAGScheduler.scala:1321)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1628)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    ... 8 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 9 more
16/06/07 16:26:41 INFO AppClient$ClientEndpoint: Executor added: app-20160607161937-0010/2 on worker-20160607145945-192.168.33.100-34250 (192.168.33.100:34250) with 1 cores
16/06/07 16:26:41 INFO SparkDeploySchedulerBackend: Granted executor ID app-20160607161937-0010/2 on hostPort 192.168.33.100:34250 with 1 cores, 1222.0 MB RAM
16/06/07 16:26:41 INFO AppClient$ClientEndpoint: Executor updated: app-20160607161937-0010/2 is now RUNNING
16/06/07 16:26:41 INFO AppClient$ClientEndpoint: Executor updated: app-20160607161937-0010/2 is now EXITED (Command exited with code 1)
16/06/07 16:26:41 INFO SparkDeploySchedulerBackend: Executor app-20160607161937-0010/2 removed: Command exited with code 1
16/06/07 16:26:41 INFO TaskSchedulerImpl: Cancelling stage 0
16/06/07 16:26:41 INFO TaskSchedulerImpl: Stage 0 was cancelled

并且应用程序失败 .

是否有人能够提供有关如何优雅地恢复并允许应用程序继续使用剩余和/或新执行程序的任何指导?

非常感谢,

戴夫