首页 文章

尝试将数据帧行映射到更新行时出现编码器错误

提问于
浏览
25

当我试图在我的代码中做同样的事情,如下所述

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

我从这里采取了上述参考:Scala: How can I replace value in Dataframs using scala但我得到编码器错误

无法找到存储在数据集中的类型的编码器 . 导入spark.im plicits支持原始类型(Int,S tring等)和产品类型(case类)._将在以后的版本中添加对序列化其他类型的支持 .

注意:我正在使用spark 2.0!

2 回答

  • 57

    这里没有什么意外的 . 您正在尝试使用已使用Spark 1.x编写且Spark 2.0不再支持的代码:

    • in 1.x DataFrame.map((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]

    • in 2.x Dataset[Row].map((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

    说实话,它在1.x中也没有多大意义 . 独立于版本,您只需使用 DataFrame API:

    import org.apache.spark.sql.functions.{when, lower}
    
    val df = Seq(
      (2012, "Tesla", "S"), (1997, "Ford", "E350"),
      (2015, "Chevy", "Volt")
    ).toDF("year", "make", "model")
    
    df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
    

    如果你真的想使用 map ,你应该使用静态类型 Dataset

    import spark.implicits._
    
    case class Record(year: Int, make: String, model: String)
    
    df.as[Record].map {
      case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
      case rec => rec
    }
    

    或者至少返回一个具有隐式编码器的对象:

    df.map {
      case Row(year: Int, make: String, model: String) => 
        (year, if(make.toLowerCase == "tesla") "S" else make, model)
    }
    

    最后,如果因为某些原因你真的要映射 Dataset[Row] ,你必须提供所需的编码器:

    import org.apache.spark.sql.catalyst.encoders.RowEncoder
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    // Yup, it would be possible to reuse df.schema here
    val schema = StructType(Seq(
      StructField("year", IntegerType),
      StructField("make", StringType),
      StructField("model", StringType)
    ))
    
    val encoder = RowEncoder(schema)
    
    df.map {
      case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
        Row(year, "S", model)
      case row => row
    } (encoder)
    
  • 4

    对于预先知道数据帧模式的情况,@ zero323给出的答案是解决方案

    但是对于具有动态模式/或将多个数据帧传递给泛型函数的场景:以下代码对我们起作用,同时从1.5.0迁移到2.2.0

    import org.apache.spark.sql.Row
    
    val df = Seq(
       (2012, "Tesla", "S"), (1997, "Ford", "E350"),
       (2015, "Chevy", "Volt")
     ).toDF("year", "make", "model")
    
    val data = df.rdd.map(row => {
      val row1 = row.getAs[String](1)
      val make = if (row1.toLowerCase == "tesla") "S" else row1
      Row(row(0),make,row(2))
    })
    

    此代码在两个版本的spark上执行 .

    缺点:数据帧/数据集api上的spark提供的优化不会被应用 .

相关问题