环境:Spark 1.6.3,火花,150个 Actuator * 2个核心,每个6 GB(内存40%),python .

我有一个带有3列的spark-Dataframe:{int('userId'),longInt('productId'),double('CatgResult')},数据帧的长度大约为10亿 .

数据具有一个特征,即当'userId' - 'productId'对作为关键字时,整个数据帧中最多只存在另一个对应物,并且大多数'userId' - 'productId'对可能是整个数据帧中只有一对 .

现在这里是代码:

# From HIVE, to dismiss other factors.
unionTable = sqlContext.sql("SELECT * FROM tempdb.unionTable")
# The problem.
unionTable = unionTable.groupby(['userId', 'productId']).sum('CatgResult')
# Any action.
unionTalbe.show()

每当unionTable.action()时,它都会被困在'groupby() . sum()'大约10分钟 . 从spark web UI中,我可以看到在这个阶段的总共150个 Actuator 中:1 . 两个 Actuator 输入大约200~300 MB,但是随机写入2.5GB,这拖累了执行时间; 2.其他执行者的输入范围为900MB~32B,随机数写入的数据量几乎相同 .

更重要的是:

  • 我已经尝试将每个'userId' - 'productId'对合并到一个字符串键中,使用str(unionTable ['userId'])“::”str(unionTable ['productId']),但它没有帮助 .

  • 然后我附加了一个随机的int [0,10]作为字符串后缀,也没有帮助 .

  • 我将数据帧转换为rdd,使用reduceByKey,仍然存在偏斜,但需要花费更多时间 .

在整个程序中,这个groupby()或reduceByKey()占用总执行时间的95%,我不知道发生了什么 .

我的问题是:这个数据偏差严重拖累了我的程序,因为我要处理数万亿的数据,有时会出现“乱丢输出位置丢失”错误..我必须找出问题是什么以及这个数据是如何偏斜的可以解决..

Overview

The top shuffle output