关于这个问题有很多帖子,但没有人回答我的问题 .
我试图将许多不同的数据帧连接在一起,在PySpark中遇到 OutOfMemoryError
.
我的本地机器有16GB的内存,我已经设置了我的Spark配置:
class SparkRawConsumer:
def __init__(self, filename, reference_date, FILM_DATA):
self.sparkContext = SparkContext(master='local[*]', appName='my_app')
SparkContext.setSystemProperty('spark.executor.memory', '3g')
SparkContext.setSystemProperty('spark.driver.memory', '15g')
很明显有很多关于Spark中OOM错误的SO帖子,但基本上大多数人都说要增加你的内存属性 .
我基本上是从50-60个较小的数据帧执行连接,这些数据帧有两列 uid
和 data_in_the_form_of_lists
(通常,它是一个Python字符串列表) . 我加入的主数据框大约有10列,但也包含 uid
列(我正在加入) .
我只是想加入1,500行数据 . 但是,当显然所有这些数据都适合内存时,我会遇到频繁的OutOfMemory错误 . 我通过查看我的存储中的SparkUI来确认这一点:
在代码中,我的联接看起来像这样:
# lots of computations to read in my dataframe and produce metric1, metric2, metric3, .... metric 50
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
on="gid_value")
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
metric1
, metric2
和 metric3
是我在连接之前转换为数据帧的RDD(请记住,实际上有50个这些较小的 metric
dfs正在加入) .
我打电话给 metric.count()
强制评估,因为它似乎有助于防止内存错误(否则我会在尝试最终收集时遇到更多的驱动程序错误) .
错误是非确定性的 . 我没有看到它们一直出现在我的连接中的任何特定位置,有时似乎发生了我的最后一次调用,有时在较小的连接期间 .
我真的怀疑任务序列化/反序列化存在一些问题 . 例如,当我查看典型阶段的事件时间轴时,我发现其中大部分都是由任务反序列化占用的:
我还注意到垃圾收集时间非常多:
垃圾收集是导致内存错误的问题吗?还是任务序列化?
编辑以回答评论问题
我一直在运行Spark作业,作为更大的PyCharm项目的一部分(因此为什么火花上下文被包裹在一个类中) . 我重构了代码,将其作为脚本运行,使用以下spark提交:
spark-submit spark_consumer.py \
--driver-memory=10G \
--executor-memory=5G \
--conf spark.executor.extraJavaOptions='-XX:+UseParallelGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'
1 回答
我遇到了类似的问题,它适用于:
Spark提交:
码: