首页 文章

通过键相交的 RDD

提问于
浏览
2

我有两个 RDD,一个 RDD 很大,另一个小得多。我想用小 RDD 中的键找到大 RDD 中的所有唯一元组。

  • RDD 太大,以至于我必须避免完全洗牌

  • 小 RDD 也足够大,我无法广播。我也许可以广播其密钥。

  • 也有重复的元组,我只关心不同的元组。

例如

large_rdd = sc.parallelize([('abcdefghij'[i%10], i) for i in range(100)] * 5)
small_rdd = sc.parallelize([('zab'[i%3], i) for i in range(10)])
expected_rdd = [
    ('a', [1, 4, 7, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90]),
    ('b',   [2, 5, 8, 1, 11, 21, 31, 41, 51, 61, 71, 81, 91])]

在我的解决方案中,有两个昂贵的操作-连接和独立操作。我都认为这会导致完全洗牌,并使子 RDD 哈希分区。鉴于此,以下是我能做的最好的事情吗?

keys = sc.broadcast(small_rdd.keys().distinct().collect())

filtered_unique_large_rdd = (large_rdd
    .filter(lambda (k, v): k in keys.value)
    .distinct()
    .groupByKey())

(filtered_unique_large_rdd
     .join(small_rdd.groupByKey())
     .mapValues(lambda x: sum([list(i) for i in x], []))
     .collect())

基本上,我显式过滤元组,选择不同的元组,然后与 smaller_rdd 联接。我希望该不同的操作将把键的哈希值分区,并且在随后的连接期间不会引起另一次洗牌。

预先感谢您提供的 suggestions/ideas。

PS:这不是spark 中的哪个功能用于通过键组合两个 RDD的副本,因为可以选择加入(完全随机播放)。

1 回答

  • 1

    在我的解决方案中,有两个昂贵的操作-连接和独立操作。

    实际上,有三个昂贵的操作。您应该将groupByKey添加到列表中。

    我希望该不同的操作将把键的哈希值分区,并且在随后的连接期间不会引起另一次洗牌。

    distinct不会,但随后的groupByKey会。问题在于它需要对数据进行两次混洗-一次对distinct一次,对groupByKey一次。

    filtered_unique_large_rdd.toDebugString()
    
    ## (8) PythonRDD[27] at RDD at PythonRDD.scala:43 []
    ##  |  MapPartitionsRDD[26] at mapPartitions at PythonRDD.scala:374 []
    ##  |  ShuffledRDD[25] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    ##  +-(8) PairwiseRDD[24] at groupByKey at <ipython-input-11-8a3af1a8d06b>:2 []
    ##     |  PythonRDD[23] at groupByKey at <ipython-input-11-8a3af1a8d06b>:2 []
    ##     |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:374 []
    ##     |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    ##     +-(8) PairwiseRDD[20] at distinct at <ipython-input-11-8a3af1a8d06b>:2 []
    ##        |  PythonRDD[19] at distinct at <ipython-input-11-8a3af1a8d06b>:2 []
    ##        |  ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:423 []
    

    您可以尝试用aggregateByKey替换distinct后跟groupByKey

    zeroValue = set()
    
    def seqFunc(acc, x):
        acc.add(x)
        return acc
    
    def combFunc(acc1, acc2):
        acc1.update(acc2)
        return acc1
    
    grouped_by_aggregate = (large_rdd
        .filter(lambda kv: k[0] in keys.value)
        .aggregateByKey(zeroValue, seqFunc, combFunc))
    

    与您当前的解决方案相比,它只需要对large_rdd进行一次随机播放:

    grouped_by_aggregate.toDebugString()
    
    ## (8) PythonRDD[54] at RDD at PythonRDD.scala:43 []
    ##  |  MapPartitionsRDD[53] at mapPartitions at PythonRDD.scala:374
    ##  |  ShuffledRDD[52] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    ##  +-(8) PairwiseRDD[51] at aggregateByKey at <ipython-input-60-67c93b2860a0 ...
    ##     |  PythonRDD[50] at aggregateByKey at <ipython-input-60-67c93b2860a0> ...
    ##     |  ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:423 []
    

    另一个可能的改进是在广播之前将密钥转换为 set:

    keys = sc.broadcast(set(small_rdd.keys().distinct().collect()))
    

    现在,您的代码会在列表中针对过滤器的每个步骤执行线性搜索。

相关问题