首页 文章

使用scala基于Spark Data DataFrame中现有列的聚合添加新列

提问于
浏览
2

我有一个像下面这样的DataFrame . 我需要根据现有列创建一个新列 .

col1 col2
a      1
a      2
b      1
c      1
d      1
d      2

输出数据框看起来像这样

col1  col2 col3 col4
a      1   1      2
a      2   1      2
b      1   0      1
c      1   0      1
d      1   1      2
d      2   1      2

我用来查找col3的逻辑是 if count of col1 > 1col4 is max value of col2 .

我熟悉如何在sql中做到这一点 . 但是很难找到数据帧DSL的解决方案 . 任何帮助,将不胜感激 . 谢谢

3 回答

  • 2

    groupBy col1和聚合得到countmax . 然后你可以用原始数据帧回复它以获得你想要的结果

    val df2 = df1.groupBy("col1").agg(count() as col3, max("col2") as col4) 
    
    val df3 = df1.join(df2, "col1")
    
  • 1

    spark df具有名为 withColumn 的属性您可以根据需要添加任意数量的派生列 . 但该列未添加到现有DF,而是创建添加了列的新DF .

    例如向数据添加静态日期

    val myFormattedData = myData.withColumn("batchdate",addBatchDate(myData("batchdate")))
    val addBatchDate = udf { (BatchDate: String) => "20160101" }
    
  • 2

    要添加col3,您可以在/时使用withcolumn:

    val df2 = df.withColumn("col3",when($"col2" > 1, 1).otherwise(0))
    

    要添加col4,已经提到的groupBy / max join应该完成这项工作:

    val df3 = df2.join(df.groupBy("col1").max("col2"), "col1")
    

相关问题