首页 文章

Pyspark基于第二个DataFrame的列向一个DataFrame添加值

提问于
浏览
1

我有两个pyspark DataFrames,如下所示:

DataFrame A:

+-----+------+
|nodes|counts|
+-----+------+
|  [0]|     1|
|  [1]|     0|
|  [2]|     1|
|  [3]|     0|
|  [4]|     0|
|  [5]|     0|
|  [6]|     1|
|  [7]|     0|
|  [8]|     0|
|  [9]|     0|
| [10]|     0|

和DataFrame B:

+----+------+
|nodes|counts|
+----+------+
|[0] |     1|
|[1] |     0|
|[2] |     3|
|[6] |     0|
|[8] |     2|
+----+------+

我想创建一个新的DataFrame C,使DataFrame A中“counts”列中的值与DataFrame B的“counts”列中的值相加,其中“nodes”列相等,使得DataFrame C看起来像:

+-----+------+
|nodes|counts|
+-----+------+
|  [0]|     2|
|  [1]|     0|
|  [2]|     4|
|  [3]|     0|
|  [4]|     0|
|  [5]|     0|
|  [6]|     1|
|  [7]|     0|
|  [8]|     2|
|  [9]|     0|
| [10]|     0|

我很感激帮助!我已经尝试了一些使用lambda函数和sql语句的不同技巧,并且在解决方案方面做得不够 .

2 回答

  • 0

    可能有一种更有效的方法,但这应该有效:

    import pyspark.sql.functions as func
    
    dfA = spark.createDataFrame([([0], 1),([1], 0),([2], 1),([3], 0), ([4], 0),([5], 0),([6], 1),([7], 0), ([8], 0),([9], 0),([10], 0)], ["nodes", "counts"])
    dfB = spark.createDataFrame([([0], 1),([1], 0),([2], 3),([6], 0), ([8], 2)], ["nodes", "counts"])
    
    dfC = dfA.join(dfB, dfA.nodes == dfB.nodes, "left")\
        .withColumn("sum",func.when(dfB.nodes.isNull(), dfA.counts).otherwise(dfA.counts+ dfB.counts))\
        .select(dfA.nodes.alias("nodes"), func.col("sum").alias("counts"))
    
    dfC.orderBy("nodes").show()
    +-----+------+
    |nodes|counts|
    +-----+------+
    |  [0]|     2|
    |  [1]|     0|
    |  [2]|     4|
    |  [3]|     0|
    |  [4]|     0|
    |  [5]|     0|
    |  [6]|     1|
    |  [7]|     0|
    |  [8]|     2|
    |  [9]|     0|
    | [10]|     0|
    +-----+------+
    
  • 1

    您可以 join 这两个数据帧如下所示,并将 null 替换为 0 并添加两列以获取 sum

    A.join(B.withColumnRenamed("count", "countB"), Seq("nodes"), "left")
      .na.fill(0)
      .withColumn("count", $"count" + $"countB")
      .drop("countB")
      .show(false)
    

    您还可以使用 union 然后将groupBy节点合并为单个数据帧并计算 sum ,如下所示

    A.union(B).groupBy("nodes").agg(sum($"count").alias("count"))
      .orderBy("nodes")
      .show(false)
    

    这是scala希望你可以在pyspark中写它 .

    希望这可以帮助!

相关问题