首页 文章

Pyspark:重新分区vs partitionBy

提问于
浏览
17

我现在正在研究这两个概念,并希望有一些清晰度 . 通过命令行,我一直在尝试识别差异,以及开发人员何时使用repartition vs partitionBy .

以下是一些示例代码:

rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c',1), ('ef',5)])
rdd1 = rdd.repartition(4)
rdd2 = rdd.partitionBy(4)

rdd1.glom().collect()
[[('b', 1), ('ef', 5)], [], [], [('a', 1), ('a', 2), ('b', 3), ('c', 1)]]

rdd2.glom().collect()
[[('a', 1), ('a', 2)], [], [('c', 1)], [('b', 1), ('b', 3), ('ef', 5)]]

我看了两者的实现,我注意到的唯一区别是partitionBy可以采用分区功能,或者默认情况下使用portable_hash . 所以在partitionBy中,所有相同的键应该在同一个分区中 . 在重新分区中,我希望值在分区上更均匀地分布,但事实并非如此 .

鉴于此,为什么有人会使用重新分配?我想我唯一能看到它被使用的是我是不是在使用PairRDD,或者我有大数据偏差?

有什么东西我不知道,还是有人可以从不同的角度为我揭开光芒?

2 回答

  • 9

    repartition 已存在于RDD中,并且不按键(或除订购之外的任何其他标准)处理分区 . 现在,PairRDD添加了密钥的概念,并随后添加了另一种允许按该密钥分区的方法 .

    所以是的,如果您的数据是键控的,那么您应该绝对按该键进行分区,这在很多情况下是首先使用PairRDD的点(对于连接,reduceByKey等) .

  • 9

    repartition() 用于指定考虑核心数量和数据量的分区数量 .

    partitionBy() 用于使洗牌函数更有效,例如 reduceByKey()join()cogroup() 等 . 它仅在多次使用RDD的情况下有用,因此通常后跟 persist() .

    两者在行动中的差异:

    pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x))
    
    pairs.partitionBy(3).glom().collect()
    [[(3, 3), (6, 6), (6, 6)],
     [(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)],
     [(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]]
    
    pairs.repartition(3).glom().collect()
    [[(4, 4), (2, 2), (6, 6), (7, 7), (5, 5), (5, 5)],
     [(1, 1), (4, 4), (6, 6), (4, 4)],
     [(2, 2), (3, 3), (1, 1), (5, 5), (7, 7)]]
    

相关问题