嗨我想在DataFrame的每一行中使用现有列添加新列,我在Spark Scala中尝试这样... df是包含可变列数的数据帧,可以仅在运行时决定 .
// Added new column "docid"
val df_new = appContext.sparkSession.sqlContext.createDataFrame(df.rdd, df.schema.add("docid", DataTypes.StringType))
df_new.map(x => {
import appContext.sparkSession.implicits._
val allVals = (0 to x.size).map(x.get(_)).toSeq
val values = allVals ++ allVals.mkString("_")
Row.fromSeq(values)
})
但这是错误的是日食本身
-
无法找到存储在数据集中的类型的编码器 . 导入spark.implicits支持原始类型(Int,String等)和产品类型(case类) . 在将来的版本中将添加对序列化其他类型的支持 .
-
没有足够的方法映射参数:(隐式证据$ 7:org.apache.spark.sql.Encoder [org.apache.spark.sql.Row])org.apache.spark.sql.Dataset [org.apache.spark . sql.Row . 未指定的值参数证据$ 7 .
请帮忙 .
1 回答
通过使用UDF和withColumn Api可以更好地完成它