当我在Spark 2中检索数据集时,使用select语句,基础列继承查询列的数据类型 .
val ds1 = spark.sql("select 1 as a, 2 as b, 'abd' as c")
ds1.printSchema()
root
|-- a: integer (nullable = false)
|-- b: integer (nullable = false)
|-- c: string (nullable = false)
现在,如果我将其转换为case类,它将正确转换值,但底层架构仍然是错误的 .
case class abc(a: Double, b: Double, c: String)
val ds2 = ds1.as[abc]
ds2.printSchema()
root
|-- a: integer (nullable = false)
|-- b: integer (nullable = false)
|-- c: string (nullable = false)
ds2.collect
res18: Array[abc] = Array(abc(1.0,2.0,abd))
我“应该”能够指定在创建第二个数据集时使用的编码器,但scala似乎忽略了这个参数(这是一个BUG吗?):
val abc_enc = org.apache.spark.sql.Encoders.product[abc]
val ds2 = ds1.as[abc](abc_enc)
ds2.printSchema
root
|-- a: integer (nullable = false)
|-- b: integer (nullable = false)
|-- c: string (nullable = false)
因此,我可以看到简单地执行此操作的唯一方法是使用createDataset,但这需要对底层对象进行收集,因此它并不理想 .
val ds2 = spark.createDataset(ds1.as[abc].collect)
4 回答
这是Spark API中的一个未解决的问题(查看此票据SPARK-17694)
所以你需要做的是做一个额外的显式演员 . 这样的事情应该有效:
你可以简单地在
columns
上使用cast
方法你应该有
您也可以在使用sql查询选择时强制转换列,如下所示
这有架构为
现在您可以使用案例类转换为数据集
现在有哪个架构
希望这可以帮助!
好的,我想我已经以更好的方式解决了这个问题 .
我们可以只引用数据集的rdd,而不是在创建新数据集时使用collect .
而不是
我们用:
这使得延迟评估保持不变,但允许新数据集将encoder用于abc案例类,后续架构将在我们使用它创建新表时反映这一点 .