我有一个由超过1.4亿个键值对组成的RDD . 根据我的分析,我发现只有大约500个唯一键 . 我必须对这个RDD进行排序 .

我尝试使用groupByKey()根据键对所有键值对进行分组,然后使用sortBy()对所有键进行排序 . 排序后,我使用map()输出排序的键值对 .
代码如下所示:

def func(list_x):
 output = []
 for element in list_x[1]:
  output.append((list_x[0], element))
 return output 


textFile = context.textFile(input_features)\
.map(lambda line: score_data(line)) \
.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y)\
.sortBy(lambda line: line[0]) \
.map(lambda x : func((x[0], list(x[1]))))\
.saveAsTextFile(output_features)

通过这种方法,我在火花中得到了这个错误

Container killed by YARN for exceeding memory limits. 6.3 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

此错误是因为有一些键在groupByKey()之后保存了1000万个值 .

我用 combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y) 替换了groupbykey . 但现在这项工作仍在运行超过1小时 . 正常分选只需约30分钟 .

由于键的数量相对于键值对的数量非常小,因此我们可以实现类似于计数排序的东西 .

我申请了repartition() . 这减少了完成火花工作所需的时间但是我觉得通过将计数排序作为线性复杂性的操作来进一步减少它可以进一步减少 .

有没有办法在pyspark中实现它?