首页 文章

Spark使用上一行的值向数据框添加新列

提问于
浏览
24

我想知道如何在Spark(Pyspark)中实现以下功能

初始数据帧:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+

结果数据帧:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+

我通过使用类似的东西来管理通常"append"新列到数据框: df.withColumn("new_Col", df.num * 10)

但是我不知道如何为新列实现这种“行的移位”,以便新列具有前一行的字段值(如示例所示) . 我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容 .

任何帮助,将不胜感激 .

2 回答

  • 29

    您可以使用 lag 窗口功能,如下所示

    from pyspark.sql.functions import lag, col
    from pyspark.sql.window import Window
    
    df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
    w = Window().partitionBy().orderBy(col("id"))
    df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()
    
    ## +---+---+-------+
    ## | id|num|new_col|
    ## +---+---+-------|
    ## |  2|3.0|    5.0|
    ## |  3|7.0|    3.0|
    ## |  4|9.0|    7.0|
    ## +---+---+-------+
    

    但是有一些重要的问题:

    • 如果您需要全局操作(未被其他一些列/列分区),则效率极低 .

    • 您需要一种自然的方式来订购数据 .

    虽然第二个问题几乎从来都不是问题,但第一个问题可能是一个交易破坏者 . 如果是这种情况,您只需将 DataFrame 转换为RDD并手动计算 lag . 参见例如:

    其他有用的链接:

  • 0
    val df = sc.parallelize(Seq((4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0))).toDF("id", "num")
    df.show
    +---+---+
    | id|num|
    +---+---+
    |  4|9.0|
    |  3|7.0|
    |  2|3.0|
    |  1|5.0|
    +---+---+
    df.withColumn("new_column", lag("num", 1, 0).over(w)).show
    +---+---+----------+
    | id|num|new_column|
    +---+---+----------+
    |  1|5.0|       0.0|
    |  2|3.0|       5.0|
    |  3|7.0|       3.0|
    |  4|9.0|       7.0|
    +---+---+----------+
    

相关问题