首页 文章

PySpark takeOrdered Multiple Fields(Ascending and Descending)

提问于
浏览
0

来自pyspark.RDD的takeOrdered方法从RDD获取N个元素,这些元素按升序排列或由可选键函数指定,如pyspark.RDD.takeOrdered所述 . 该示例使用一个键显示以下代码:

>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
[10, 9, 7, 6, 5, 4]

是否也可以定义更多的密钥,例如x,y,z是否有3列数据?

键应该是不同的顺序,例如x = asc,y = desc,z = asc . 这意味着如果两行的第一个值x相等,那么第二个值y应该按降序使用 .

1 回答

  • 2

    对于数字,您可以写:

    n = 1
    rdd = sc.parallelize([
        (-1, 99, 1), (-1, -99, -1), (5, 3, 8), (-1, 99, -1)
    ])
    
    rdd.takeOrdered(n, lambda x: (x[0], -x[1], x[2]))
    # [(-1, 99, -1)]
    

    对于其他对象,您可以定义某种类型的记录类型并定义您自己的一组丰富的比较方法:

    class XYZ(object):
        slots = ["x", "y", "z"]
    
        def __init__(self, x, y, z):
            self.x, self.y, self.z = x, y, z
    
        def __eq__(self, other):
            if not isinstance(other, XYZ):
                return False
            return self.x == other.x and self.y == other.y and self.z == other.z
    
        def __lt__(self, other):
            if not isinstance(other, XYZ):
                raise ValueError(
                    "'<' not supported between instances of 'XYZ' and '{0}'".format(
                        type(other)
                ))
            if self.x == other.x:
                if self.y == other.y:
                    return self.z < other.z
                else:
                    return self.y > other.y
            else:
                return self.x < other.x
    
        def __repr__(self):
            return "XYZ({}, {}, {})".format(self.x, self.y, self.z)
    
        @classmethod
        def from_tuple(cls, xyz):
            x, y, z = xyz
            return cls(x, y, z)
    

    然后:

    from xyz import XYZ
    
    rdd.map(XYZ.from_tuple).takeOrdered(n)
    # [XYZ(-1, 99, -1)]
    

    在实践中只使用SQL:

    from pyspark.sql.functions import asc, desc
    
    rdd.toDF(["x", "y", "z"]).orderBy(asc("x"), desc("y"), asc("z")).take(n)
    # [Row(x=-1, y=99, z=-1)]
    

相关问题