首页 文章

Pyspark . 内存不足的问题 . 如何确保覆盖表

提问于
浏览
0

我目前试图了解Spark计算的过程以及对内存消耗的影响 .

我在Zeppelin中使用Spark 2.3.2和Python 2.7 .

基本上在以下循环中我创建了集合 . 我正在使用sci-kit-learn构建机器学习模型,并且在sci-kit-learn计算之后我在pyspark-dataframes上进行了大量的数据帧操作 . 对于每一个我,我得到一个表rsmeMaeStep与8行和10列与小字符串或双值 . rsmeMaeAll只是将单个分析添加到一起,并且具有8 * 26 = 208行,其中10列用于i = 26 .

for i in range(26):
    df_features_train, df_features_validation = randomizer(dataFiltered)
    rsmeMaeStep, rsmeMaeAll = rsmeMaeAnalysis(rsmeMaeAll,df_features_train,df_features_test)
    print(i)

我为代码做了一些时间分析 . 对于i = 1,i = 10:2:40分钟需要17秒,i = 26需要6:42 . (即10或26个循环的时间长9.4或23.6倍 . )到目前为止,一切都如预期的那样 . 我在下一步有问题 . 以下代码应该只是对8到206行的简单聚合 . 对于i = 1,花费32秒,因为i = 7 4:43(长8.8倍)但是对于i = 26,我在47分钟后有0%或者因内存不足而失败 .

rsmeMae = rsmeMaeAll.select('set','setting','sme').orderBy('set','setting')
import pyspark.sql.functions as f
rsmeMaeAverage = rsmeMae.groupBy('setting','set').agg(f.count(('setting')).alias('nrOfRand'), f.round(f.mean('sme'),2).alias('rsme'),f.round(f.stddev('sme'),2).alias('sigmaRsme')).orderBy('set','setting')
z.show(rsmeMaeAverage)

基于我认为所有表的逻辑应该在每个循环中被覆盖 . 每个循环只有小的rsmeMaeAll会增加一点 . 但它仍然是一个非常小的桌子 .

但Spark的表现可能与众不同 .

据我了解情况,第一步的sk-learn代码在第一步执行 . 如果我正确理解了火花延迟评估,那么当我想打印结果时,我的代码中的pySpark操作就会开始执行 . 因此,Spark可能会保存内存中所有循环的所有表 . 是对的吗?

如果我是对的,我需要代码在每个循环结束时直接计算pySpark代码 .

我怎么能这样做?

如果我这样做会在下一个循环中引发覆盖表,或者每个循环的内存消耗是否仍会上升?我是否需要主动删除内存中的表格以及如何删除表格?

edit: 我刚刚整合

rsmeMaeStep.collect()
rsmeMaeAll.collect()

进入循环以确保pyspark计算立即完成 . 但是第一个循环需要55秒 . 第7次花了超过10分钟,它在49分钟后在第8圈的rsmeMaeAll.collect()中cru . 带有错误消息:

Py4JJavaError:调用o13488.collectToPython时发生错误 . :java.lang.OutOfMemoryError:Java堆空间

我真的不明白每个循环的指数上升时间 . 在我至少能够运行10个循环之前 . 那里发生了什么?

1 回答

  • 0

    我认为这个问题与Spark的懒惰评估有关 . 由于我收集了所有信息pyspark数据帧rsmeMaeAll,当我尝试计算输出时,生成rsmeMaeAll所需的所有信息都可能同时加载到缓存中 .

    基于这个想法,我以一种Spark不再需要保持所有步骤的方式重建代码 . 此外,我集成了一个时间测量,并在两个变体中重建旧代码,使一个变体更接近新逻辑和每个变体,计算必须在每个循环结束时完成 .

    解决方案如下:

    for i in range(9):
        ti0 = time.time()
        df_features_train, df_features_test = randomizer(dataFiltered)
        rsmeMaeStep = rsmeMaeAnalysis(df_features_train,df_features_test)
        rsmeMaeAllpd = rsmeMaeAllpd.append(rsmeMaeStep.toPandas())
        print(rsmeMaeAllpd)
        ti1 = time.time()
        print "Time for loop", i, ":", ti1-ti0
    

    在rsmeMaeAnalysis中,我刚刚计算了分析结果,返回它们,将它们转换为Pandas数据帧并收集了Pandas中的所有结果 . 结果是每个循环或多或少地相同的时间,甚至在20个循环后我没有内存问题 . 前十个循环的时间如下:

    41s,42s,44s,40s,43s,43s,40s,39s,40s,40s

    但后来我想确保在pyspark数据帧中收集结果确实是问题,因此我尽可能接近pandas-solution构建代码,但是在pyspark数据帧中收集结果:

    for i in range(10):
        ti0 = time.time()
        df_features_train, df_features_test = randomizer(dataFiltered)
        rsmeMaeStep = rsmeMaeAnalysis(df_features_train,df_features_test)
        rsmeMaeAll = rsmeMaeAll.union(rsmeMaeStep)
        rsmeMaeAll.show(80,False)
        ti1 = time.time()
        print "Time for loop", i, ":", ti1-ti0
    

    前八个循环的时间如下:

    43s,63s,88s,144s,162s,175s,212s,276s

    在原始变体中,仅使用时间测量,它需要以下时间,直到第7次循环后出现内存不足错误:

    44s,60s,73s,98s,128s,157s,198s

    最后,它似乎是懒惰的评估导致生成rsmeMaeAll所需的大量信息同时被加载到缓存中,尽管大多数信息在每个循环结束时都不相关 .

相关问题