首页 文章

使用Spark Scala使用现有列添加新列

提问于
浏览
1

嗨我想在DataFrame的每一行中使用现有列添加新列,我在Spark Scala中尝试这样... df是包含可变列数的数据帧,可以仅在运行时决定 .

// Added new column "docid"
val df_new = appContext.sparkSession.sqlContext.createDataFrame(df.rdd, df.schema.add("docid", DataTypes.StringType))

 df_new.map(x => {
        import appContext.sparkSession.implicits._
      val allVals = (0 to x.size).map(x.get(_)).toSeq
      val values = allVals ++ allVals.mkString("_") 
      Row.fromSeq(values)
    })

但这是错误的是日食本身

  • 无法找到存储在数据集中的类型的编码器 . 导入spark.implicits支持原始类型(Int,String等)和产品类型(case类) . 在将来的版本中将添加对序列化其他类型的支持 .

  • 没有足够的方法映射参数:(隐式证据$ 7:org.apache.spark.sql.Encoder [org.apache.spark.sql.Row])org.apache.spark.sql.Dataset [org.apache.spark . sql.Row . 未指定的值参数证据$ 7 .

请帮忙 .

1 回答

  • 0

    通过使用UDF和withColumn Api可以更好地完成它

相关问题