首页 文章

PYSPARK:使用reduceByKey排序

提问于
浏览
2

我有一个如下的RDD

dataSource = sc.parallelize( [("user1", (3, "blue")), ("user1", (4, "black")), ("user2", (5, "white"), ("user2", (3, "black")), ("user2", (6, "red")), ("user1", (1, "red"))] )

我想使用 reduceByKey 为每个用户找到前2种颜色,因此输出将是RDD,如:

sc.parallelize([("user1", ["black", "blue"]), ("user2", ["red", "white"])])

所以我需要通过键减少然后对每个键的值进行排序,即(数字,颜色)数字并返回前n个颜色 .

我不想使用 groupBy . 如果除了 groupBy 之外还有比 reduceByKey 更好的东西,那就太棒了:)

1 回答

  • 1

    例如,您可以使用heap queue . 所需进口:

    import heapq
    from functools import partial
    

    助手功能:

    def zero_value(n):
        """Initialize a queue. If n is large
        it could be more efficient to track a number of the elements
        on heap (cnt, heap) and switch between heappush and heappushpop
        if we exceed n. I leave this as an exercise for the reader."""
        return [(float("-inf"), None) for _ in range(n)]
    
    def seq_func(acc, x):
        heapq.heappushpop(acc, x)
        return acc
    
    def merge_func(acc1, acc2, n):
        return heapq.nlargest(n, heapq.merge(acc1, acc2))
    
    def finalize(kvs):
        return [v for (k, v) in kvs if k != float("-inf")]
    

    数据:

    rdd = sc.parallelize([
        ("user1", (3, "blue")), ("user1", (4, "black")),
        ("user2", (5, "white")), ("user2", (3, "black")),
        ("user2", (6, "red")), ("user1", (1, "red"))])
    

    解:

    (rdd
        .aggregateByKey(zero_value(2), seq_func, partial(merge_func, n=2))
        .mapValues(finalize)
        .collect())
    

    结果:

    [('user2', ['red', 'white']), ('user1', ['black', 'blue'])]
    

相关问题