首页 文章

执行许多数据帧连接时PySpark OutOfMemoryErrors

提问于
浏览
2

关于这个问题有很多帖子,但没有人回答我的问题 .

我试图将许多不同的数据帧连接在一起,在PySpark中遇到 OutOfMemoryError .

我的本地机器有16GB的内存,我已经设置了我的Spark配置:

class SparkRawConsumer:

    def __init__(self, filename, reference_date, FILM_DATA):
        self.sparkContext = SparkContext(master='local[*]', appName='my_app')
        SparkContext.setSystemProperty('spark.executor.memory', '3g')
        SparkContext.setSystemProperty('spark.driver.memory', '15g')

很明显有很多关于Spark中OOM错误的SO帖子,但基本上大多数人都说要增加你的内存属性 .

我基本上是从50-60个较小的数据帧执行连接,这些数据帧有两列 uiddata_in_the_form_of_lists (通常,它是一个Python字符串列表) . 我加入的主数据框大约有10列,但也包含 uid 列(我正在加入) .

我只是想加入1,500行数据 . 但是,当显然所有这些数据都适合内存时,我会遇到频繁的OutOfMemory错误 . 我通过查看我的存储中的SparkUI来确认这一点:

Spark UI screenshot

在代码中,我的联接看起来像这样:

# lots of computations to read in my dataframe and produce metric1, metric2, metric3, .... metric 50
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")

metrics_df.count()
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
                on="gid_value")

metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
                self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
                on="uid")

metrics_df.count()
metrics_df.repartition("gid_value")

metric1metric2metric3 是我在连接之前转换为数据帧的RDD(请记住,实际上有50个这些较小的 metric dfs正在加入) .

我打电话给 metric.count() 强制评估,因为它似乎有助于防止内存错误(否则我会在尝试最终收集时遇到更多的驱动程序错误) .

错误是非确定性的 . 我没有看到它们一直出现在我的连接中的任何特定位置,有时似乎发生了我的最后一次调用,有时在较小的连接期间 .

我真的怀疑任务序列化/反序列化存在一些问题 . 例如,当我查看典型阶段的事件时间轴时,我发现其中大部分都是由任务反序列化占用的:

Spark UI screenshot serialization

我还注意到垃圾收集时间非常多:

Spark UI screenshot garbage collection

垃圾收集是导致内存错误的问题吗?还是任务序列化?

编辑以回答评论问题

我一直在运行Spark作业,作为更大的PyCharm项目的一部分(因此为什么火花上下文被包裹在一个类中) . 我重构了代码,将其作为脚本运行,使用以下spark提交:

spark-submit spark_consumer.py \
  --driver-memory=10G \
  --executor-memory=5G \
  --conf spark.executor.extraJavaOptions='-XX:+UseParallelGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'

1 回答

  • 1

    我遇到了类似的问题,它适用于:
    Spark提交:

    spark-submit --driver-memory 3g\
                --executor-memory 14g\
                *.py
    

    码:

    sc = SparkContext().getOrCreate()
    

相关问题