让我们假设以下每个时间点只运行一个Spark作业 .
到目前为止我得到了什么
以下是我了解Spark中发生的事情:
-
创建
SparkContext
时,每个工作节点启动执行程序 . 执行程序是单独的进程(JVM),它连接回驱动程序 . 每个执行程序都有驱动程序的jar . 退出驱动程序,关闭执行程序 . 每个执行程序都可以保存一些分区 . -
执行作业时,将根据沿袭图创建执行计划 .
-
执行作业分为几个阶段,其中阶段包含尽可能多的邻近(在谱系图中)转换和动作,但没有随机播放 . 因此阶段通过混洗分开 .
我明白那个
任务是通过序列化Function对象从驱动程序发送到执行程序的命令 . 执行程序反序列化(使用驱动程序jar)命令(任务)并在分区上执行它 .
但
问题
如何将舞台分成这些任务?
特别:
-
由转换和操作确定的任务是否可以在任务中进行多个转换/操作?
-
是由分区确定的任务(例如,每个分区每个阶段一个任务) .
-
节点是否确定了任务(例如每个节点每个阶段一个任务)?
我的想法(只有部分答案,即使是正确的)
在https://0x0fff.com/spark-architecture-shuffle中,随着图像解释了随机播放
我得到了规则的印象
每个阶段被分成#count-of-partitions任务,不考虑节点数量
对于我的第一张图片,我会说我有3个 Map 任务和3个减少任务 .
对于来自0x0fff的图像,我会说有8个 Map 任务和3个减少任务(假设只有三个橙色和三个深绿色文件) .
无论如何都要提出问题
那是对的吗?但即使这是正确的,我上面的问题也没有全部回答,因为它仍然是开放的,无论多个操作(例如多个 Map )是在一个任务内还是每个操作分成一个任务 .
别人怎么说
What is a task in Spark? How does the Spark worker execute the jar file?和How does the Apache Spark scheduler split files into tasks?是相似的,但我觉得我的问题在那里得不到清楚 .
3 回答
你有一个非常好的大纲 . 回答你的问题
stage
的每个数据分区启动单独的task
. 考虑每个分区可能位于不同的物理位置 - 例如HDFS中的块或本地文件系统的目录/卷 .请注意,
Stage
的提交由DAG Scheduler
驱动 . 这意味着可以将不相互依赖的阶段提交到集群以便并行执行:这最大化了集群上的并行化功能 . 因此,如果我们的数据流中的操作可以同时发生,我们将期望看到启动多个阶段 .我们可以看到以下玩具示例中的操作,其中我们执行以下类型的操作:
加载两个数据源
分别对两个数据源执行一些映射操作
加入他们
对结果执行一些映射和过滤操作
保存结果
那么我们最终会有多少阶段?
每个阶段用于并行加载两个数据源= 2个阶段
代表
join
的第三阶段,取决于其他两个阶段注意:处理联接数据的所有后续操作可能在同一阶段执行,因为它们必须按顺序执行 . 启动其他阶段没有任何好处,因为他们无法在先前的操作完成之前开始工作 .
这是玩具计划
这是结果的DAG
现在:多少 tasks ?任务数量应该等于
(
Stage
*#Partitions in the stage
)的总和如果我理解正确,有2个(相关的)事情让你感到困惑:
1)什么决定了任务的内容?
2)什么决定了要执行的任务数量?
Spark的引擎"glues"一起 simple 连续rdds上的操作,例如:
因此当rdd3(懒惰地)计算时,spark将为rdd1的每个分区生成一个任务,并且每个任务将执行过滤器和每行的映射以产生rdd3 .
任务数由分区数决定 . 每个RDD都有一个定义的分区数量 . 对于从HDFS读取的源RDD(例如,使用sc.textFile(...)),分区数是由输入格式生成的分割数 . RDD上的某些操作可能导致RDD具有不同数量的分区:
另一个例子是加入:
(大多数)改变分区数量的操作涉及一个shuffle,当我们这样做时:
实际发生的是rdd1的每个分区上的任务需要产生一个可以被下一阶段读取的结束输出,以使rdd2有1000个分区(他们是如何做到的?Hash或Sort) . 这方面的任务有时被称为"Map ( side ) tasks" . 稍后在rdd2上运行的任务将作用于一个分区(rdd2!),并且必须弄清楚如何读取/组合与该分区相关的 Map 侧输出 . 这方面的任务有时被称为"Reduce ( side ) tasks" .
这两个问题是相关的:一个阶段中的任务数量是分区的数量(连续的rdds“胶合”在一起)和rdd的分区数量可以在阶段之间改变(通过指定一些分区的数量) shuffle导致操作例如) .
一旦阶段的执行开始,其任务可以占用任务时隙 . 并发任务槽的数量是numExecutors * ExecutorCores . 通常,这些可以由来自不同的非依赖阶段的任务占据 .
这可能有助于您更好地理解不同的部分:
阶段:是一组任务 . 针对不同数据子集(分区)运行的相同进程 .
任务:表示分布式数据集分区上的工作单元 . 因此,在每个阶段,任务数量=分区数,或者您说“每个分区每个阶段一个任务” .
每个 Actuator 在一个纱线容器上运行,每个容器驻留在一个节点上 .
每个阶段使用多个执行者,每个执行者分配多个vcores .
每个vcore一次只能执行一个任务
因此,在任何阶段,都可以并行执行多个任务 . 正在运行的任务数=正在使用的正在使用的数量 .