当我们期待Spark 2.0时,我们计划对数据集进行一些令人兴奋的改进,特别是:...自定义编码器 - 虽然我们目前自动生成各种类型的编码器,但我们想为自定义对象打开一个API .
并尝试在 Dataset
中存储自定义类型导致以下错误:
无法找到存储在数据集中的类型的编码器 . 导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类) . 将来版本中将添加对序列化其他类型的支持
要么:
Java.lang.UnsupportedOperationException:找不到编码器....
有没有现成的解决方法?
请注意,此问题仅作为社区Wiki答案的入口点存在 . 随意更新/改进问题和答案 .
8 回答
对于Java Bean类,这可能很有用
现在,您只需将dataFrame读取为自定义DataFrame即可
这将创建一个自定义类编码器,而不是二进制编码器 .
更新
这个答案仍然有效且信息丰富,尽管自2.2 / 2.3以来现在情况更好,它为
Set
,_Seq
,Map
,Date
,Timestamp
和BigDecimal
增加了内置编码器支持 . 如果你坚持只使用case类和通常的Scala类型来制作类型,你应该没有SQLImplicits
中的隐含 .不幸的是,几乎没有添加任何东西来帮助解决这个问题在Encoders.scala或SQLImplicits.scala中搜索
@since 2.0.0
主要与原始类型(以及案例类的一些调整)有关 . 所以,首先要说的是: there currently is no real good support for custom class encoders . 除此之外,接下来的是一些技巧,这些技巧可以做到我们所希望的那样好的工作,因为我们现在可以使用它们 . 作为一项前期免责声明:我们尽最大努力使所有限制清晰明确 .究竟是什么问题
当您想要创建数据集时,Spark“需要一个编码器(将T类型的JVM对象转换为内部Spark SQL表示形式),这通常是通过来自
SparkSession
的implicits自动创建的,或者可以通过调用static来显式创建Encoders
上的方法“(取自docs on createDataset) . 编码器将采用Encoder[T]
形式,其中T
是您要编码的类型 . 第一个建议是添加import spark.implicits._
(它给你these隐式编码器),第二个建议是使用this编码器相关函数集显式传入隐式编码器 .普通 class 没有编码器,所以
将为您提供以下隐式相关编译时错误:
但是,如果您在某些扩展
Product
的类中包装您刚才用于获取上述错误的任何类型,则该错误会令人困惑地延迟到运行时,因此编译得很好,但在运行时失败了
这样做的原因是Spark使用implicits创建的编码器实际上只在运行时创建(通过scala relfection) . 在这种情况下,编译时的所有Spark检查都是最外层的类扩展
Product
(所有案例类都这样做),并且只在运行时实现它仍然不知道如何处理MyObj
(如果我尝试的话会出现同样的问题)制作一个Dataset[(Int,MyObj)]
- Spark等待运行时到MyObj
上的barf) . 这些是迫切需要修复的核心问题:一些扩展
Product
尽管总是在运行时崩溃的类没有办法为嵌套类型传递自定义编码器(我无法为
MyObj
提供Spark编码器,以便它知道如何编码Wrap[MyObj]
或(Int,MyObj)
) .只需使用kryo
每个人都建议的解决方案是使用kryo编码器 .
尽管如此,这变得相当繁琐 . 特别是如果你的代码正在操纵各种数据集,加入,分组等等 . 你最终会产生一些额外的暗示 . 那么,为什么不做一个隐含的自动完成这一切呢?
现在,似乎我几乎可以做任何我想做的事情(以下示例将无法在
spark-shell
中自动导入spark.implicits._
)或差不多 . 问题是使用
kryo
会导致Spark只是将数据集中的每一行存储为平面二进制对象 . 对于map
,filter
,foreach
这已足够,但对于像join
这样的操作,Spark确实需要将这些分成列 . 检查d2
或d3
的架构,您会看到只有一个二进制列:元组的部分解决方案
因此,使用Scala中的隐含魔法(更多内容在6.26.3 Overloading Resolution中),我可以使自己成为一系列暗示,尽可能做好工作,至少对于元组来说,并且可以很好地处理现有的含义:
然后,有了这些暗示,我可以让我的例子在上面工作,尽管有一些列重命名
我还没有想出如何在没有重命名的情况下获得预期的元组名称(
_1
,_2
,...) - 如果有人想要玩这个,this是引入名称"value"
的地方this是通常添加元组名称的位置 . 但是,关键是我现在有一个很好的结构化架构:总而言之,这个解决方法:
允许我们为元组获取单独的列(因此我们可以再次加入元组,是的!)
我们可以再次依赖于implicits(所以不需要在整个地方传递
kryo
)几乎完全向后兼容
import spark.implicits._
(涉及一些重命名)不允许我们加入
kyro
序列化二进制列,更不用说那些可能有的字段了有一个令人不快的副作用,即将一些元组列重命名为"value"(如果需要,可以通过转换
.toDF
,指定新列名称并转换回数据集来撤消 - 并且模式名称似乎通过连接保留,他们最需要的地方) .一般类的部分解决方案
这个不太愉快,没有好的解决方案 . 但是,现在我们已经有了上面的元组解决方案,我有一个预感,另一个答案的隐式转换解决方案也会有点痛苦,因为你可以将更复杂的类转换为元组 . 然后,在创建数据集之后,您可能会使用数据框方法重命名列 . 如果一切顺利,这实际上是一个改进,因为我现在可以在我的类的字段上执行连接 . 如果我刚刚使用了一个不可能的平面二进制
kryo
序列化器 .这是一个做一些事情的例子:我有一个类
MyObj
,其字段类型为Int
,java.util.UUID
和Set[String]
. 第一个照顾自己 . 第二个,虽然我可以使用kryo
进行序列化,如果存储为String
会更有用(因为UUID
s通常是我想加入的东西) . 第三个真正属于二进制列 .现在,我可以使用这个机制创建一个具有良好模式的数据集:
模式向我展示了具有正确名称的列和前两个我可以加入的对象 .
现在有两种通用编码器kryo和javaSerialization,其中后者明确描述为:
假设下课
您可以通过添加隐式编码器来使用这些编码器:
可以一起使用如下:
它将对象存储为
binary
列,因此当转换为DataFrame
时,您将获得以下模式:也可以使用
kryo
编码器为特定字段编码元组:请注意,我们不能使用
toDS
方法 .提供可编码的表示与自定义类之间的隐式转换,例如:
相关问题:
您可以使用UDTRegistration,然后使用Case Classes,Tuples等...所有这些都可以正常使用您的用户定义类型!
Say you want to use a custom Enum:
像这样注册:
然后用它!
Say you want to use a Polymorphic Record:
......并且像这样使用它:
您可以编写一个自定义UDT,将所有内容编码为字节(我在这里使用java序列化,但最好是使用Spark的Kryo上下文) .
首先定义UDT类:
然后注册:
那你就可以用了!
编码器在
Spark2.0
中的工作方式大致相同 . 和Kryo
仍然是推荐的serialization
选择 .您可以使用spark-shell查看以下示例
直到现在]目前没有
appropriate encoders
所以我们的人员没有被编码为binary
值 . 但是,一旦我们使用Kryo
序列化提供一些implicit
编码器,这将会改变 .我的示例将使用Java,但我认为很难适应Scala .
只要
Fruit
是一个简单的Java Bean,我就已经非常成功地使用spark.createDataset和Encoders.bean将RDD<Fruit>
转换为Dataset<Fruit>
.Step 1: Create the simple Java Bean.
在DataBricks人员加强他们的编码器之前,我会坚持使用原始类型和String作为字段的类 . 如果您有一个具有嵌套对象的类,则创建另一个简单的Java Bean,其所有字段都被展平,因此您可以使用RDD转换将复杂类型映射到更简单的类型 . 当然,对于使用平面架构的性能,它有很大的帮助 .
Step 2: Get your Dataset from the RDD
瞧!泡沫,冲洗,重复 .
对于那些可能在我的情况下我也在这里给出答案的人 .
再具体一点,
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
结果:
(1,Set(1))
除了已经给出的建议之外,我最近发现的另一个选项是你可以声明你的自定义类,包括特征
org.apache.spark.sql.catalyst.DefinedByConstructorParams
.如果类具有使用ExpressionEncoder可以理解的类型的构造函数,即原始值和标准集合,则此方法有效 . 当您无法将类声明为案例类时,它可以派上用场,但是每次将它包含在数据集中时都不希望使用Kryo对其进行编码 .
例如,我想声明一个包含Breeze向量的case类 . 能够处理的唯一编码器通常是Kryo . 但是如果我声明了一个扩展了Breeze DenseVector和DefinedByConstructorParams的子类,那么ExpressionEncoder就知道它可以被序列化为双打数组 .
这是我如何宣布它:
现在,我可以使用简单的ExpressionEncoder而不是Kryo在数据集中使用
SerializableDenseVector
(直接或作为产品的一部分) . 它就像一个Breeze DenseVector,但序列化为一个数组[双] .