当我试图在我的代码中做同样的事情,如下所述
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
我从这里采取了上述参考:Scala: How can I replace value in Dataframs using scala但我得到编码器错误
无法找到存储在数据集中的类型的编码器 . 导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持 .
注意:我正在使用spark 2.0!
2 回答
这里没有什么意外的 . 您正在尝试使用已使用Spark 1.x编写且Spark 2.0不再支持的代码:
in 1.x
DataFrame.map
是((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
in 2.x
Dataset[Row].map
是((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
说实话,它在1.x中也没有多大意义 . 独立于版本,您只需使用
DataFrame
API:如果你真的想使用
map
,你应该使用静态类型Dataset
:或者至少返回一个具有隐式编码器的对象:
最后,如果因为某些原因你真的要映射
Dataset[Row]
,你必须提供所需的编码器:对于预先知道数据帧模式的情况,@ zero323给出的答案是解决方案
但是对于具有动态模式/或将多个数据帧传递给泛型函数的场景:以下代码对我们起作用,同时从1.5.0迁移到2.2.0
此代码在两个版本的spark上执行 .
缺点:数据帧/数据集api上的spark提供的优化不会被应用 .