我有两个 RDD,一个 RDD 很大,另一个小得多。我想用小 RDD 中的键找到大 RDD 中的所有唯一元组。
-
RDD 太大,以至于我必须避免完全洗牌
-
小 RDD 也足够大,我无法广播。我也许可以广播其密钥。
-
也有重复的元组,我只关心不同的元组。
例如
large_rdd = sc.parallelize([('abcdefghij'[i%10], i) for i in range(100)] * 5)
small_rdd = sc.parallelize([('zab'[i%3], i) for i in range(10)])
expected_rdd = [
('a', [1, 4, 7, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90]),
('b', [2, 5, 8, 1, 11, 21, 31, 41, 51, 61, 71, 81, 91])]
在我的解决方案中,有两个昂贵的操作-连接和独立操作。我都认为这会导致完全洗牌,并使子 RDD 哈希分区。鉴于此,以下是我能做的最好的事情吗?
keys = sc.broadcast(small_rdd.keys().distinct().collect())
filtered_unique_large_rdd = (large_rdd
.filter(lambda (k, v): k in keys.value)
.distinct()
.groupByKey())
(filtered_unique_large_rdd
.join(small_rdd.groupByKey())
.mapValues(lambda x: sum([list(i) for i in x], []))
.collect())
基本上,我显式过滤元组,选择不同的元组,然后与 smaller_rdd 联接。我希望该不同的操作将把键的哈希值分区,并且在随后的连接期间不会引起另一次洗牌。
预先感谢您提供的 suggestions/ideas。
PS:这不是spark 中的哪个功能用于通过键组合两个 RDD的副本,因为可以选择加入(完全随机播放)。
1 回答
实际上,有三个昂贵的操作。您应该将
groupByKey
添加到列表中。distinct
不会,但随后的groupByKey
会。问题在于它需要对数据进行两次混洗-一次对distinct
一次,对groupByKey
一次。您可以尝试用
aggregateByKey
替换distinct
后跟groupByKey
:与您当前的解决方案相比,它只需要对
large_rdd
进行一次随机播放:另一个可能的改进是在广播之前将密钥转换为 set:
现在,您的代码会在列表中针对过滤器的每个步骤执行线性搜索。