首页 文章

如何在spark sql聚合中添加三个整数列

提问于
浏览
1

我遇到的一个问题是Spark sql聚合 . 我有一个数据帧,我正在从apache phoenix加载记录 .

val df = sqlContext.phoenixTableAsDataFrame(
  Metadata.tables(A.Test), Seq("ID", "date", "col1", "col2","col3"),
  predicate = Some("\"date\" = " + date), zkUrl = Some(zkURL))

在另一个数据帧中,我需要根据ID和日期进行汇总,然后求和col1,col2,col3,即

val df1 = df.groupBy($"ID", $"date").agg(
  sum($"col1" + $"col2" + $"col3").alias("col4"))

但是在做总结时我得到的结果不正确 . 我们如何将所有列(col1,col2,col3)相加并将其分配给col4?

例:

假设数据是这样的:

ID,date,col1,col2,col3
1,2017-01-01,5,10,12
2,2017-01-01,6,9,17
3,2017-01-01,2,3,7
4,2017-01-01,5,11,13

预期产量:

ID,date,col4 
1,2017-01-01,27
2,2017-01-01,32
3,2017-01-01,12
4,2017-01-01,29

1 回答

  • -1

    我用这段代码得到了正确的结果:

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row}
    import org.apache.spark.sql.functions.{col, sum}
    import org.apache.spark.sql.types.{IntegerType,  StructField, StructType}
    
      val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
        Seq(
          Row(1, 1, 5, 10, 12 ),
          Row(2, 1, 6, 9,  17 ),
          Row(3, 1, 2, 3,  7),
          Row(4, 1, 5, 11, 13)
    
        )
      )
    
      val schema: StructType = new StructType()
        .add(StructField("id",    IntegerType,  false))
        .add(StructField("date",  IntegerType, false))
        .add(StructField("col1",  IntegerType, false))
        .add(StructField("col2",  IntegerType, false))
        .add(StructField("col3",  IntegerType, false))
      val df0: DataFrame = spark.createDataFrame(rowsRdd, schema)
    
      val df = df0.groupBy(col("id"), col("date")).agg(sum(col("col1") + col("col2") + col("col3")).alias("col4")).sort("id")
    
      df.show()
    

    结果是:

    +---+----+----+
    | id|date|col4|
    +---+----+----+
    |  1|   1|  27|
    |  2|   1|  32|
    |  3|   1|  12|
    |  4|   1|  29|
    +---+----+----+
    

    这是你需要的吗?

相关问题