首页 文章

为什么我的火花工作有这么多的任务?默认情况下获取200个任务

提问于
浏览
16

我有一个spark作业,它接收来自hdfs的8条记录的文件,做一个简单的聚合并将其保存回hdfs . 当我这样做时,我注意到有数百个任务 .

我也不确定为什么有这么多工作?我觉得工作更像是一个动作发生的时候 . 我可以推测为什么 - 但我的理解是,在这段代码中它应该是一个工作,它应该分解为阶段,而不是多个工作 . 为什么不把它分解成各个阶段,为什么它会闯入工作岗位?

至于200多个任务,由于数据量和节点数量微乎其微,当只有一个聚合和几个过滤器时,每行数据有25个任务是没有意义的 . 为什么每个原子操作每个分区只有一个任务?

这是相关的scala代码 -

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object TestProj {object TestProj {
  def main(args: Array[String]) {

    /* set the application name in the SparkConf object */
    val appConf = new SparkConf().setAppName("Test Proj")

    /* env settings that I don't need to set in REPL*/
    val sc = new SparkContext(appConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")

     /*the below rdd will have schema defined in Record class*/
     val rddCase =  sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
      .map(x=>x.split(" "))    //file record into array of strings based spaces
      .map(x=>Record(
        x(0).toInt,
        x(1).asInstanceOf[String],
        x(2).asInstanceOf[String],
        x(3).toInt))


    /* the below dataframe groups on first letter of first name and counts it*/
    val aggDF = rddCase.toDF()
      .groupBy($"firstName".substr(1,1).alias("firstLetter"))
      .count
      .orderBy($"firstLetter")

    /* save to hdfs*/ 
 aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")

  }

    case class Record(id: Int
      , firstName: String
      , lastName: String
      , quantity:Int)

}

下面是点击应用程序后的屏幕截图
enter image description here

以下是查看id 0
enter image description here
的具体"job"时显示的阶段

下面是单击具有200多个任务的舞台时屏幕的第一部分

enter image description here

这是舞台内的第二部分
enter image description here

点击"executors"标签
enter image description here
后面

根据要求,以下是工作ID 1的各个阶段

enter image description here

以下是具有200个任务的作业ID 1中的阶段的详细信息

enter image description here

2 回答

  • 1

    这是一个经典的Spark问题 .

    用于读取的两个任务(第二个图中的阶段Id 0)是 defaultMinPartitions 设置,设置为2.您可以通过读取REPL sc.defaultMinPartitions 中的值来获取此参数 . 它也应该在"Environment" tap下的Spark UI中可见 .

    您可以从github查看code,看看这到底发生了什么 . 如果您希望在读取时使用更多分区,只需将其添加为参数,例如 sc.textFile("a.txt", 20) .

    现在有趣的部分来自第二阶段出现的200个分区(第二个阶段的阶段Id 1) . 好吧,每次有一个shuffle,Spark需要决定shuffle RDD有多少个分区 . 可以想象,默认值为200 .

    您可以使用以下方法更改:

    sqlContext.setConf("spark.sql.shuffle.partitions", "4”)
    

    如果使用此配置运行代码,您将看到200个分区不再存在 . 如何设置此参数是一种艺术 . 也许选择2倍的核心数量(或其他) .

    我认为Spark 2.0有一种方法可以自动推断shuffle RDD的最佳分区数 . 期待那样!

    最后,您获得的作业数量与生成的优化Dataframe代码产生的数量有关 . 如果您阅读Spark规范,它会说每个RDD操作都会触发一个作业 . 当您的操作涉及Dataframe或SparkSQL时,Catalyst优化器将找出执行计划并生成一些基于RDD的代码来执行它 . 在你的情况下,很难确切地说它为什么会使用两个动作 . 您可能需要查看优化的查询计划,以确切了解正在执行的操作 .

  • 24

    我有个类似的问题 . 但在我的场景中,我并行化的集合比Spark计划的任务数量少(导致火花有时奇怪地表现) . 使用强制分区号我能解决这个问题 .

    它是这样的:

    collection = range(10) # In the real scenario it was a complex collection
    sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario
    

    然后,我在Spark日志中看到:

    INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks
    

相关问题