首页 文章

Spark Window性能问题

提问于
浏览
1

我有一个镶木地板数据框,具有以下结构:

  • ID字符串

  • 日期日期

  • 480其他类型为Double的要素列

我必须用相应的加权移动平均值替换480个特征列中的每一个,窗口为250.最初,我尝试对单个列执行此操作,使用以下简单代码:

var data = sparkSession.read.parquet("s3://data-location")
var window = Window.rowsBetween(-250, Window.currentRow - 1).partitionBy("ID").orderBy("DATE")
data.withColumn("Feature_1", col("Feature_1").divide(avg("Feature_1").over(window))).write.parquet("s3://data-out")

输入数据包含2000万行,每个ID约有4-5000个日期 . 我在AWS EMR集群(m4.xlarge实例)上运行此操作,其中一列的结果如下:

  • 4个 Actuator X 4个核心X 10 GB 1 GB用于纱线开销(每个任务2.5GB,16个并发运行任务),耗时14分钟

  • 8个 Actuator X 4个核心X 10GB 1 GB用于纱线开销(每个任务2.5GB,32个并发运行任务),耗时8分钟

我已经调整了以下设置,希望将总时间缩短:

  • spark.memory.storageFraction 0.02

  • spark.sql.windowExec.buffer.in.memory.threshold 100000

  • spark.sql.constraintPropagation.enabled false

第二个有助于防止在日志中看到一些溢出,但没有一个帮助实际的性能 .

我不明白为什么只需要2000万条记录需要这么长时间 . 我知道,对于计算加权移动平均线,它需要做20 M X 250(窗口大小)平均值和分区,但是有16个核心(第一次运行)我不明白为什么它需要这么长时间 . 我无法想象其余479个剩余的特征列需要多长时间!

我还尝试通过设置增加默认的shuffle分区:

  • spark.sql.shuffle.partitions 1000

但即使有1000个分区,它也没有带来时间 . 还尝试在调用窗口聚合之前按ID和DATE对数据进行排序,没有任何好处 .

有没有办法改善这个,或者窗口函数通常用我的用例运行缓慢?这只是20M行,远不及火花可以处理其他类型的工作负载 .

1 回答

  • 0

    您的数据集大小约为70 GB . 如果我对每个id都正确理解它,它会在所有记录的日期上进行排序,然后将前面的250条记录进行平均 . 由于您需要在超过400列上应用此功能,我建议在拼花创建时尝试分组以避免改组 . 写入桶状镶木地板文件需要相当长的时间,但对于所有480列的推导,可能不需要花费8分钟* 480执行时间 .

    请在创建镶木地板文件时尝试分组或重新分区并排序,并告诉我它是否有效 .

相关问题