首页 文章

Apache Spark:如何在代码中取消作业并终止正在运行的任务?

提问于
浏览
1

我在客户端模式下使用Yarn(版本2.6.0)在Hadoop集群上运行Spark应用程序(版本1.6.0) . 我有一段代码运行一个很长的计算,如果它需要太长时间我想杀死它(然后运行一些其他函数) .
这是一个例子:

val conf = new SparkConf().setAppName("TIMEOUT_TEST")
val sc = new SparkContext(conf)
val lst = List(1,2,3)
// setting up an infite action
val future = sc.parallelize(lst).map(while (true) _).collectAsync()

try {
    Await.result(future, Duration(30, TimeUnit.SECONDS))
    println("success!")
} catch {
    case _:Throwable =>
        future.cancel()
        println("timeout")
}

// sleep for 1 hour to allow inspecting the application in yarn
Thread.sleep(60*60*1000)
sc.stop()

超时设置为30秒,但当然计算是无限的,因此等待将来的结果将抛出一个异常,将被捕获,然后将取消未来并且将执行备份功能 .
这一切都运行得很好,除了取消的作业没有完全终止:当查看应用程序的Web UI时,作业被标记为失败,但我可以看到内部仍有运行的任务 .

当我使用SparkContext.cancelAllJobs或SparkContext.cancelJobGroup时会发生同样的事情 . 问题是,即使我设法继续我的程序,取消的工作的运行任务仍然占用宝贵的资源(这将最终使我减慢到接近停止) .

总结一下:如何以一种终止该作业的所有正在运行的任务的方式杀死Spark作业? (与现在发生的情况相反,这会阻止作业运行新任务,但让当前正在运行的任务完成)

更新:
经过很长一段时间忽略了这个问题,我们发现了一个混乱但有效的小解决方法 . 我们只是在发生超时时记录所有活动阶段的阶段ID,而不是尝试从Spark应用程序中删除相应的Spark Job / Stage,并向用于查杀的Spark Web UI提供的URL发出HTTP GET请求所说的阶段 .

1 回答

  • 0

    根据setJobGroup:

    “如果作业组的interruptOnCancel设置为true,则作业取消将导致在作业的执行程序线程上调用Thread.interrupt() . ”

    因此, Map 中的anno功能必须是可以中断的,如下所示:

    val future = sc.parallelize(lst).map(while (!Thread.interrupted) _).collectAsync()
    

相关问题