首页 文章

Pyspark:计算行最小值,忽略零和空值

提问于
浏览
1

我想基于数据框中现有的列子集创建一个新列(v5) .

示例数据帧:

+---+---+---+---+
| v1| v2| v3| v4|
+---+---+---+---+
|  2|  4|7.0|4.0|
| 99|  0|2.0|0.0|
|189|  0|2.4|0.0|
+---+---+---+---+

提供示例数据帧的另一个视图:

+---+---+---+---+
| v1| v3| v2| v4|
+---+---+---+---+
|  2|7.0|  4|4.0|
| 99|2.0|  0|0.0|
|189|2.4|  0|0.0|
+---+---+---+---+

由以下人员创建:

rdd1 = sc.parallelize([
                (2,   7.0, 4, 4.0), 
                (99,  2.0, 0, 0.0), 
                (189, 2.4, 0, 0.0)])
d = sqlContext.createDataFrame(rdd1, ('v1', 'v3','v2','v4'))

最后,我想要做的是创建另一个列v5,它是对应于v1和v2的最小值的值,忽略列中任何一个中的零和空值 . 假设v1为键,v3为值对 . 类似地,v2是键,v4是值 . 例如,在第一行中:在v1和v2中,最小值属于v1,即2,因此v5列中的输出应为7.0同样,在第二行中:忽略v1和v2的零值和空值,输出应为是2.0

原始数据帧有五列作为键,相应的五列作为值Desired output:

+---+---+---+---+---+
| v1| v2| v3| v4| v5|
+---+---+---+---+---+
|  2|  4|7.0|4.0|7.0|
| 99|  0|2.0|0.0|2.0|
|189|  0|2.4|0.0|2.4|
+---+---+---+---+---+

我试图通过udf中的最少函数来实现这一点,但是无法实现这一点 . 我正在使用pyspark 1.6 . 任何帮助深表感谢 .

1 回答

  • 2

    有了数据:

    df = spark.createDataFrame([
      (2, 4, 3.0, .0), (99, 0, 2.0, 0.0), (189, 0, 2.4, 0.0)],
      ("v1", "v2", "v3", "v4")
    )
    

    你可以用 -Inf+Inf 代替 NULL / 0 .

    from pyspark.sql.functions import col, lit, least, greatest, when
    
    cols = ["v3", "v4"]
    
    min_ = least(*[
        when(col(c).isNull() | (col(c) == 0), float("inf")).otherwise(col(c))
        for c in cols
    ]).alias("min")
    
    max_ = greatest(*[
        when(col(c).isNull() | (col(c) == 0), float("-inf")).otherwise(col(c))
        for c in cols
    ]).alias("max")
    

    并选择:

    df.select("*", min_, max_).show()
    # +---+---+---+---+---+---+
    # | v1| v2| v3| v4|min|max|
    # +---+---+---+---+---+---+
    # |  2|  4|3.0|7.0|3.0|7.0|
    # | 99|  0|2.0|0.0|2.0|2.0|
    # |189|  0|2.4|0.0|2.4|2.4|
    # +---+---+---+---+---+---+
    

相关问题