我试图找出在我的集群节点上配置内存的最佳方法 . 但是我相信,为此,我需要进一步了解一些事情,例如spark如何处理任务中的内存 .
例如,假设我有3个 Actuator ,每个 Actuator 可以并行运行多达8个任务(即8个核心) . 如果我有一个带有24个分区的RDD,这意味着理论上所有分区都可以并行处理 . 但是,如果我们在此处放大一个执行程序,则假定每个任务都可以将其分区放在内存中以对其进行操作 . 如果不是那么这意味着不会发生并行的8个任务,并且需要一些调度 .
因此,我得出结论,如果一个人寻求真正的并行性,对分区大小有所了解会有所帮助,因为它会告诉您如何确定执行者的大小以实现真正的并行性 .
-
Q0 - 我只想更好地理解,当一个 Actuator 内并非所有分区都能适应内存时会发生什么?有些是在磁盘上溢出而有些是在内存中进行操作吗? spark会为每个任务保留内存吗?如果它检测到没有足够的内存,它会安排任务吗?或者只是在内存不足错误中运行 .
-
Q1 - 执行程序中的真正并行性是否还取决于执行程序可用的内存量?换句话说,我的群集中可能有8个核心,但如果我没有足够的内存来同时加载8个数据分区,那么我将不会完全并行 .
作为最后一点,我已经多次看到以下声明,但发现它有点令人困惑:
“增加分区数量也有助于减少内存不足错误,因为这意味着Spark将为每个执行程序在较小的数据子集上运行 . ”
这是如何工作的?我的意思是spark可以在较小的子集上工作,但如果总分区集合无论如何都不适合内存,会发生什么?
1 回答
为什么要增加任务(分区)的数量?
我想先回答最后一个令你困惑的问题 . 以下是另一个question的引用:
事实上,默认情况下,Spark会将to split input data automatically尝试为一些最佳分区数:
可以指定正在执行的操作的分区数(例如
cogroup
:def cogroup[W](other: RDD[(K, W)], numPartitions: Int)
),并且在任何RDD转换后也可以执行.repartition()
.此外,他们在文件的同一段后面说:
In summary :
通常建议每个CPU使用
Spark如何处理不适合内存的输入?
In short ,通过划分输入和中间结果(RDD) . 通常,每个小块都适合执行程序可用的内存,并且可以快速处理 .
Spark能够caching the RDDs它已计算出来 . 默认情况下,每次重复使用RDD时,都会重新计算(不会缓存);调用
.cache()
或.persist()
可以帮助保持已在内存中或磁盘上计算的结果 .在内部,每个执行程序都有一个在执行和存储之间浮动的内存池(有关详细信息,请参阅here) . 当没有足够的内存用于任务执行时,Spark首先尝试逐出某些存储缓存,然后将任务数据溢出到磁盘上 . 有关详细信息,请参阅slides . Actuator 和存储器之间的 balancer 在blog post中有详细描述,它也有一个很好的例子:
OutOfMemory通常不是直接因为大量输入数据而发生,而是因为分区不佳以及因此大的辅助数据结构,如减速器上的
HashMap
(here文档再次建议有比 Actuator 更多的分区) . 所以,不,OutOfMemory不会因为大输入而发生,但处理起来可能会非常慢(因为它必须从磁盘写入/读取) . 他们还建议使用小到200毫秒(运行时间)的任务对于Spark来说是好的 .Outline :正确拆分数据:每个核心超过1个分区,每个任务的运行时间应> 200 ms . 默认分区很好起点,手动调整参数 .
(我建议在1/8群集上使用1/8的输入数据子集来查找最佳分区数 . )
同一执行者内的任务是否相互影响?
Short answer :他们这样做 . 有关更多详细信息,请查看上面提到的slides(从幻灯片#32开始) .
所有N个任务都获得了可用内存的第N部分,从而影响了彼此的"parallelism" . 如果我解释你对 true parallelism 的想法,那就是"full utilization of CPU resources" . 在这种情况下,是的,小内存池将导致磁盘上的数据溢出,并且计算变为IO绑定(而不是受CPU限制) .
进一步阅读
我强烈推荐整个章节Tuning Spark和Spark Programming Guide . 另请参阅Alexey Grishchenko撰写的关于Spark Memory Management的博客文章 .