首页 文章

如何在Dataset中存储自定义对象?

提问于
浏览
117

根据Introducing Spark Datasets

当我们期待Spark 2.0时,我们计划对数据集进行一些令人兴奋的改进,特别是:...自定义编码器 - 虽然我们目前自动生成各种类型的编码器,但我们想为自定义对象打开一个API .

并尝试在 Dataset 中存储自定义类型导致以下错误:

无法找到存储在数据集中的类型的编码器 . 导入sqlContext.implicits.支持原始类型(Int,String等)和产品类型(case类) . 将来版本中将添加对序列化其他类型的支持

要么:

Java.lang.UnsupportedOperationException:找不到编码器....

有没有现成的解决方法?


请注意,此问题仅作为社区Wiki答案的入口点存在 . 随意更新/改进问题和答案 .

8 回答

  • 28
    • 使用通用编码器 .

    现在有两种通用编码器kryojavaSerialization,其中后者明确描述为:

    效率极低,应该只作为最后的手段 .

    假设下课

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    您可以通过添加隐式编码器来使用这些编码器:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    可以一起使用如下:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    它将对象存储为 binary 列,因此当转换为 DataFrame 时,您将获得以下模式:

    root
     |-- value: binary (nullable = true)
    

    也可以使用 kryo 编码器为特定字段编码元组:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    请注意,我们不能使用 toDS 方法 .

    • 使用隐式转换:

    提供可编码的表示与自定义类之间的隐式转换,例如:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

    相关问题:

  • 5

    对于那些可能在我的情况下我也在这里给出答案的人 .

    再具体一点,

    • 我正在从SQLContext中读取“设置类型数据” . 因此原始数据格式是DataFrame .

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

    • 然后使用带有mutable.WrappedArray类型的rdd.map()将其转换为RDD .

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    结果:

    (1,Set(1))

  • 180

    除了已经给出的建议之外,我最近发现的另一个选项是您可以声明自定义类,包括特征 org.apache.spark.sql.catalyst.DefinedByConstructorParams .

    如果类具有使用ExpressionEncoder可以理解的类型的构造函数,即原始值和标准集合,则此方法有效 . 当您无法将类声明为案例类时,它可以派上用场,但是每次将它包含在数据集中时都不希望使用Kryo对其进行编码 .

    例如,我想声明一个包含Breeze向量的case类 . 能够处理的唯一编码器通常是Kryo . 但是如果我声明了一个扩展了Breeze DenseVector和DefinedByConstructorParams的子类,那么ExpressionEncoder就知道它可以被序列化为双打数组 .

    这是我如何宣布它:

    class SerializableDenseVector(values: Array[Double]) extends breeze.linalg.DenseVector[Double](values) with DefinedByConstructorParams
    implicit def BreezeVectorToSerializable(bv: breeze.linalg.DenseVector[Double]): SerializableDenseVector = bv.asInstanceOf[SerializableDenseVector]
    

    现在,我可以使用简单的ExpressionEncoder而不是Kryo在数据集中使用 SerializableDenseVector (直接或作为产品的一部分) . 它就像Breeze DenseVector一样工作,但序列化为一个数组[Double] .

  • 3

    更新

    这个答案仍然有效且信息丰富,尽管自2.2 / 2.3以来现在情况更好,它为 SetSeqMapDateTimestampBigDecimal 增加了内置编码器支持 . 如果您坚持只使用case类和通常的Scala类型创建类型,那么你应该没有 SQLImplicits 中的隐含 .


    不幸的是,几乎没有添加任何东西来帮助解决这个问题在Encoders.scalaSQLImplicits.scala中搜索 @since 2.0.0 主要与原始类型(以及案例类的一些调整)有关 . 所以,首先要说的是: there currently is no real good support for custom class encoders . 除此之外,接下来的是一些技巧,这些技巧可以做到我们所希望的那样好的工作,考虑到我们目前拥有的东西 . 作为一项前期免责声明:我们尽最大努力使所有限制清晰明确 .

    究竟是什么问题

    当您想要创建数据集时,Spark“需要一个编码器(将T类型的JVM对象转换为内部Spark SQL表示形式),这通常是通过 SparkSession 的隐含自动创建的,或者可以通过调用static显式创建 Encoders 上的方法“(取自docs on createDataset) . 编码器将采用 Encoder[T] 形式,其中 T 是您要编码的类型 . 第一个建议是添加 import spark.implicits._ (它给你these隐式编码器),第二个建议是使用this编码器相关函数集显式传入隐式编码器 .

    普通 class 没有编码器,所以

    import spark.implicits._
    class MyObj(val i: Int)
    // ...
    val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    

    将为您提供以下隐式相关编译时错误:

    无法找到存储在数据集中的类型的编码器 . 导入sqlContext.implicits.支持原始类型(Int,String等)和产品类型(case类) . 将来会添加对序列化其他类型的支持发布

    但是,如果您在某些扩展 Product 的类中包装您刚才用于获取上述错误的任何类型,则该错误会导致运行时间延迟,因此

    import spark.implicits._
    case class Wrap[T](unwrap: T)
    class MyObj(val i: Int)
    // ...
    val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
    

    编译得很好,但在运行时失败了

    java.lang.UnsupportedOperationException:找不到MyObj的编码器

    这样做的原因是Spark使用implicits创建的编码器实际上只在运行时创建(通过scala relfection) . 在这种情况下,编译时的所有Spark检查都是最外层的类扩展 Product (所有案例类都这样做),并且只在运行时实现它仍然不知道如何处理 MyObj (如果我尝试的话会出现同样的问题)制作一个 Dataset[(Int,MyObj)] - Spark等待运行时到 MyObj 的barf) . 这些是迫切需要修复的核心问题:

    • 一些扩展 Product 编译的类,尽管总是在运行时崩溃

    • 无法为嵌套类型传递自定义编码器(我无法为 MyObj 提供Spark编码器,以便它知道如何编码 Wrap[MyObj](Int,MyObj) ) .

    只需使用kryo

    每个人建议的解决方案是使用kryo编码器 .

    import spark.implicits._
    class MyObj(val i: Int)
    implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
    // ...
    val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    

    但这很快就变得非常繁琐 . 特别是如果你的代码正在操纵各种数据集,加入,分组等等 . 你最终会产生一些额外的暗示 . 那么,为什么不做一个隐含的自动完成这一切呢?

    import scala.reflect.ClassTag
    implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
      org.apache.spark.sql.Encoders.kryo[A](ct)
    

    现在,似乎我几乎可以做任何我想做的事情(以下示例将无法在 spark-shell 中自动导入 spark.implicits._

    class MyObj(val i: Int)
    
    val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
    val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
    val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
    

    或差不多 . 问题是使用 kryo 导致Spark只是将数据集中的每一行存储为平面二进制对象 . 对于 mapfilterforeach 这已足够,但对于像 join 这样的操作,Spark确实需要将它们分成列 . 检查 d2d3 的架构,您会看到只有一个二进制列:

    d2.printSchema
    // root
    //  |-- value: binary (nullable = true)
    

    元组的部分解决方案

    因此,使用Scala中的implicits(更多内容在6.26.3 Overloading Resolution中),我可以使自己成为一系列暗示,尽可能做好工作,至少对于元组来说,并且可以很好地处理现有的含义:

    import org.apache.spark.sql.{Encoder,Encoders}
    import scala.reflect.ClassTag
    import spark.implicits._  // we can still take advantage of all the old implicits
    
    implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
    
    implicit def tuple2[A1, A2](
      implicit e1: Encoder[A1],
               e2: Encoder[A2]
    ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
    
    implicit def tuple3[A1, A2, A3](
      implicit e1: Encoder[A1],
               e2: Encoder[A2],
               e3: Encoder[A3]
    ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
    
    // ... you can keep making these
    

    然后,有了这些暗示,我可以让我的例子在上面工作,尽管有一些列重命名

    class MyObj(val i: Int)
    
    val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
    val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
    val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
    

    我还没有弄清楚如何在没有重命名的情况下获得预期的元组名称( _1_2 ,...) - 如果其他人想要玩这个,this"value" 的名称被引入的地方this是通常添加元组名称的位置 . 但是,关键是我现在有一个很好的结构化架构:

    d4.printSchema
    // root
    //  |-- _1: struct (nullable = false)
    //  |    |-- _1: integer (nullable = true)
    //  |    |-- _2: binary (nullable = true)
    //  |-- _2: struct (nullable = false)
    //  |    |-- _1: integer (nullable = true)
    //  |    |-- _2: binary (nullable = true)
    

    总而言之,这个解决方法:

    • 允许我们为元组获取单独的列(所以我们可以再次加入元组,是的!)

    • 我们可以再次依赖于implicits(所以不需要在整个地方传递 kryo

    • 几乎完全向后兼容 import spark.implicits._ (涉及一些重命名)

    • 不允许我们加入 kyro 序列化二进制列,更不用说那些可能有的字段了

    • 有一个令人不快的副作用,即将一些元组列重命名为"value"(如果需要,可以通过转换 .toDF ,指定新列名称并转换回数据集来撤消 - 并且模式名称似乎通过连接保留,他们最需要的地方) .

    一般类的部分解决方案

    这个不太愉快,没有好的解决方案 . 但是,现在我们已经有了上面的元组解决方案,我有一个预感,另一个答案的隐式转换解决方案也会有点痛苦,因为你可以将更复杂的类转换为元组 . 然后,在创建数据集之后,您可能会使用数据框方法重命名列 . 如果一切顺利,这实际上是一个改进,因为我现在可以在我的类的字段上执行连接 . 如果我刚刚使用了一个不可能的平面二进制 kryo 序列化器 .

    这是一个做一些事情的例子:我有一个类 MyObj ,它有 Intjava.util.UUIDSet[String] 类型的字段 . 第一个照顾自己 . 第二个,虽然我可以使用 kryo 进行序列化,如果存储为 String 会更有用(因为 UUID s通常是我想加入的东西) . 第三个真正属于二进制列 .

    class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
    
    // alias for the type to convert to and from
    type MyObjEncoded = (Int, String, Set[String])
    
    // implicit conversions
    implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
    implicit def fromEncoded(e: MyObjEncoded): MyObj =
      new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
    

    现在,我可以使用这个机器创建一个具有良好模式的数据集:

    val d = spark.createDataset(Seq[MyObjEncoded](
      new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
      new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
    )).toDF("i","u","s").as[MyObjEncoded]
    

    模式向我展示了具有正确名称的列和前两个我可以加入的对象 .

    d.printSchema
    // root
    //  |-- i: integer (nullable = false)
    //  |-- u: string (nullable = true)
    //  |-- s: binary (nullable = true)
    
  • 1

    编码器在 Spark2.0 中的工作方式大致相同 . 并且 Kryo 仍然是推荐的 serialization 选择 .

    您可以使用spark-shell查看以下示例

    scala> import spark.implicits._
    import spark.implicits._
    
    scala> import org.apache.spark.sql.Encoders
    import org.apache.spark.sql.Encoders
    
    scala> case class NormalPerson(name: String, age: Int) {
     |   def aboutMe = s"I am ${name}. I am ${age} years old."
     | }
    defined class NormalPerson
    
    scala> case class ReversePerson(name: Int, age: String) {
     |   def aboutMe = s"I am ${name}. I am ${age} years old."
     | }
    defined class ReversePerson
    
    scala> val normalPersons = Seq(
     |   NormalPerson("Superman", 25),
     |   NormalPerson("Spiderman", 17),
     |   NormalPerson("Ironman", 29)
     | )
    normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))
    
    scala> val ds1 = sc.parallelize(normalPersons).toDS
    ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]
    
    scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
    ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
    
    scala> ds1.show()
    +---------+---+
    |     name|age|
    +---------+---+
    | Superman| 25|
    |Spiderman| 17|
    |  Ironman| 29|
    +---------+---+
    
    scala> ds2.show()
    +----+---------+
    |name|      age|
    +----+---------+
    |  25| Superman|
    |  17|Spiderman|
    |  29|  Ironman|
    +----+---------+
    
    scala> ds1.foreach(p => println(p.aboutMe))
    I am Ironman. I am 29 years old.
    I am Superman. I am 25 years old.
    I am Spiderman. I am 17 years old.
    
    scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
    ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
    
    scala> ds2.foreach(p => println(p.aboutMe))
    I am 17. I am Spiderman years old.
    I am 25. I am Superman years old.
    I am 29. I am Ironman years old.
    

    直到现在]目前范围内没有 appropriate encoders 所以我们的人没有被编码为 binary 值 . 但是一旦我们改变了使用 Kryo 序列化提供一些 implicit 编码器 .

    // Provide Encoders
    
    scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
    normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]
    
    scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
    reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]
    
    // Ecoders will be used since they are now present in Scope
    
    scala> val ds3 = sc.parallelize(normalPersons).toDS
    ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]
    
    scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
    ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]
    
    // now all our persons show up as binary values
    scala> ds3.show()
    +--------------------+
    |               value|
    +--------------------+
    |[01 00 24 6C 69 6...|
    |[01 00 24 6C 69 6...|
    |[01 00 24 6C 69 6...|
    +--------------------+
    
    scala> ds4.show()
    +--------------------+
    |               value|
    +--------------------+
    |[01 00 24 6C 69 6...|
    |[01 00 24 6C 69 6...|
    |[01 00 24 6C 69 6...|
    +--------------------+
    
    // Our instances still work as expected    
    
    scala> ds3.foreach(p => println(p.aboutMe))
    I am Ironman. I am 29 years old.
    I am Spiderman. I am 17 years old.
    I am Superman. I am 25 years old.
    
    scala> ds4.foreach(p => println(p.aboutMe))
    I am 25. I am Superman years old.
    I am 29. I am Ironman years old.
    I am 17. I am Spiderman years old.
    
  • 1

    您可以使用UDTRegistration,然后使用Case Classes,Tuples等...所有这些都可以正常使用您的用户定义类型!

    Say you want to use a custom Enum:

    trait CustomEnum { def value:String }
    case object Foo extends CustomEnum  { val value = "F" }
    case object Bar extends CustomEnum  { val value = "B" }
    object CustomEnum {
      def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
    }
    

    像这样注册:

    // First define a UDT class for it:
    class CustomEnumUDT extends UserDefinedType[CustomEnum] {
      override def sqlType: DataType = org.apache.spark.sql.types.StringType
      override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
      // Note that this will be a UTF8String type
      override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
      override def userClass: Class[CustomEnum] = classOf[CustomEnum]
    }
    
    // Then Register the UDT Class!
    // NOTE: you have to put this file into the org.apache.spark package!
    UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)
    

    然后用它!

    case class UsingCustomEnum(id:Int, en:CustomEnum)
    
    val seq = Seq(
      UsingCustomEnum(1, Foo),
      UsingCustomEnum(2, Bar),
      UsingCustomEnum(3, Foo)
    ).toDS()
    seq.filter(_.en == Foo).show()
    println(seq.collect())
    

    Say you want to use a Polymorphic Record:

    trait CustomPoly
    case class FooPoly(id:Int) extends CustomPoly
    case class BarPoly(value:String, secondValue:Long) extends CustomPoly
    

    ......并且像这样使用它:

    case class UsingPoly(id:Int, poly:CustomPoly)
    
    Seq(
      UsingPoly(1, new FooPoly(1)),
      UsingPoly(2, new BarPoly("Blah", 123)),
      UsingPoly(3, new FooPoly(1))
    ).toDS
    
    polySeq.filter(_.poly match {
      case FooPoly(value) => value == 1
      case _ => false
    }).show()
    

    您可以编写一个自定义UDT,将所有内容编码为字节(我在这里使用java序列化,但最好是使用Spark的Kryo上下文) .

    首先定义UDT类:

    class CustomPolyUDT extends UserDefinedType[CustomPoly] {
      val kryo = new Kryo()
    
      override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
      override def serialize(obj: CustomPoly): Any = {
        val bos = new ByteArrayOutputStream()
        val oos = new ObjectOutputStream(bos)
        oos.writeObject(obj)
    
        bos.toByteArray
      }
      override def deserialize(datum: Any): CustomPoly = {
        val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
        val ois = new ObjectInputStream(bis)
        val obj = ois.readObject()
        obj.asInstanceOf[CustomPoly]
      }
    
      override def userClass: Class[CustomPoly] = classOf[CustomPoly]
    }
    

    然后注册:

    // NOTE: The file you do this in has to be inside of the org.apache.spark package!
    UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)
    

    然后你可以使用它!

    // As shown above:
    case class UsingPoly(id:Int, poly:CustomPoly)
    
    Seq(
      UsingPoly(1, new FooPoly(1)),
      UsingPoly(2, new BarPoly("Blah", 123)),
      UsingPoly(3, new FooPoly(1))
    ).toDS
    
    polySeq.filter(_.poly match {
      case FooPoly(value) => value == 1
      case _ => false
    }).show()
    
  • 6

    我的例子将是Java,但我认为很难适应Scala .

    只要 Fruit 是一个简单的Java Bean,我就使用spark.createDatasetEncoders.beanRDD<Fruit> 转换为 Dataset<Fruit> 非常成功 .

    Step 1: Create the simple Java Bean.

    public class Fruit implements Serializable {
        private String name  = "default-fruit";
        private String color = "default-color";
    
        // AllArgsConstructor
        public Fruit(String name, String color) {
            this.name  = name;
            this.color = color;
        }
    
        // NoArgsConstructor
        public Fruit() {
            this("default-fruit", "default-color");
        }
    
        // ...create getters and setters for above fields
        // you figure it out
    }
    

    在DataBricks人们加强他们的编码器之前,我会坚持使用原始类型和String作为字段的类 . 如果您有一个具有嵌套对象的类,则创建另一个简单的Java Bean,其所有字段都被展平,因此您可以使用RDD转换将复杂类型映射到更简单的类型 . 当然's a little extra work, but I imagine it' ll对使用平面模式的性能有很大帮助 .

    Step 2: Get your Dataset from the RDD

    SparkSession spark = SparkSession.builder().getOrCreate();
    JavaSparkContext jsc = new JavaSparkContext();
    
    List<Fruit> fruitList = ImmutableList.of(
        new Fruit("apple", "red"),
        new Fruit("orange", "orange"),
        new Fruit("grape", "purple"));
    JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);
    
    
    RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
    Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
    Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);
    

    瞧!泡沫,冲洗,重复 .

  • 0

    对于Java Bean类,这可能很有用

    import spark.sqlContext.implicits._
    import org.apache.spark.sql.Encoders
    implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])
    

    现在,您只需将dataFrame读取为自定义DataFrame即可

    dataFrame.as[MyClass]
    

    这将创建一个自定义类编码器而不是二进制编码器 .

相关问题