我有一个spark作业,它接收来自hdfs的8条记录的文件,做一个简单的聚合并将其保存回hdfs . 当我这样做时,我注意到有数百个任务 .
我也不确定为什么有这么多工作?我觉得工作更像是一个动作发生的时候 . 我可以推测为什么 - 但我的理解是,在这段代码中它应该是一个工作,它应该分解为阶段,而不是多个工作 . 为什么不把它分解成各个阶段,为什么它会闯入工作岗位?
至于200多个任务,由于数据量和节点数量微乎其微,当只有一个聚合和几个过滤器时,每行数据有25个任务是没有意义的 . 为什么每个原子操作每个分区只有一个任务?
这是相关的scala代码 -
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object TestProj {object TestProj {
def main(args: Array[String]) {
/* set the application name in the SparkConf object */
val appConf = new SparkConf().setAppName("Test Proj")
/* env settings that I don't need to set in REPL*/
val sc = new SparkContext(appConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
/*the below rdd will have schema defined in Record class*/
val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")
.map(x=>x.split(" ")) //file record into array of strings based spaces
.map(x=>Record(
x(0).toInt,
x(1).asInstanceOf[String],
x(2).asInstanceOf[String],
x(3).toInt))
/* the below dataframe groups on first letter of first name and counts it*/
val aggDF = rddCase.toDF()
.groupBy($"firstName".substr(1,1).alias("firstLetter"))
.count
.orderBy($"firstLetter")
/* save to hdfs*/
aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")
}
case class Record(id: Int
, firstName: String
, lastName: String
, quantity:Int)
}
下面是点击应用程序后的屏幕截图
以下是查看id 0
的具体"job"时显示的阶段
下面是单击具有200多个任务的舞台时屏幕的第一部分
这是舞台内的第二部分
点击"executors"标签
后面
根据要求,以下是工作ID 1的各个阶段
以下是具有200个任务的作业ID 1中的阶段的详细信息
2 回答
这是一个经典的Spark问题 .
用于读取的两个任务(第二个图中的阶段Id 0)是
defaultMinPartitions
设置,设置为2.您可以通过读取REPLsc.defaultMinPartitions
中的值来获取此参数 . 它也应该在"Environment" tap下的Spark UI中可见 .您可以从github查看code,看看这到底发生了什么 . 如果您希望在读取时使用更多分区,只需将其添加为参数,例如
sc.textFile("a.txt", 20)
.现在有趣的部分来自第二阶段出现的200个分区(第二个阶段的阶段Id 1) . 好吧,每次有一个shuffle,Spark需要决定shuffle RDD有多少个分区 . 可以想象,默认值为200 .
您可以使用以下方法更改:
如果使用此配置运行代码,您将看到200个分区不再存在 . 如何设置此参数是一种艺术 . 也许选择2倍的核心数量(或其他) .
我认为Spark 2.0有一种方法可以自动推断shuffle RDD的最佳分区数 . 期待那样!
最后,您获得的作业数量与生成的优化Dataframe代码产生的数量有关 . 如果您阅读Spark规范,它会说每个RDD操作都会触发一个作业 . 当您的操作涉及Dataframe或SparkSQL时,Catalyst优化器将找出执行计划并生成一些基于RDD的代码来执行它 . 在你的情况下,很难确切地说它为什么会使用两个动作 . 您可能需要查看优化的查询计划,以确切了解正在执行的操作 .
我有个类似的问题 . 但在我的场景中,我并行化的集合比Spark计划的任务数量少(导致火花有时奇怪地表现) . 使用强制分区号我能解决这个问题 .
它是这样的:
然后,我在Spark日志中看到: