首页 文章

如何更改Spark数据集上的架构

提问于
浏览
2

当我在Spark 2中检索数据集时,使用select语句,基础列继承查询列的数据类型 .

val ds1 = spark.sql("select 1 as a, 2 as b, 'abd' as c")

ds1.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: integer (nullable = false)
 |-- c: string (nullable = false)

现在,如果我将其转换为case类,它将正确转换值,但底层架构仍然是错误的 .

case class abc(a: Double, b: Double, c: String)
val ds2 = ds1.as[abc]
ds2.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: integer (nullable = false)
 |-- c: string (nullable = false)

ds2.collect
res18: Array[abc] = Array(abc(1.0,2.0,abd))

我“应该”能够指定在创建第二个数据集时使用的编码器,但scala似乎忽略了这个参数(这是一个BUG吗?):

val abc_enc = org.apache.spark.sql.Encoders.product[abc]

val ds2 = ds1.as[abc](abc_enc)

ds2.printSchema
root
 |-- a: integer (nullable = false)
 |-- b: integer (nullable = false)
 |-- c: string (nullable = false)

因此,我可以看到简单地执行此操作的唯一方法是使用createDataset,但这需要对底层对象进行收集,因此它并不理想 .

val ds2 = spark.createDataset(ds1.as[abc].collect)

4 回答

  • 1

    这是Spark API中的一个未解决的问题(查看此票据SPARK-17694

    所以你需要做的是做一个额外的显式演员 . 这样的事情应该有效:

    ds1.as[abc].map(x => x : abc)
    
  • 0

    你可以简单地在 columns 上使用 cast 方法

    import sqlContext.implicits._
    val ds2 = ds1.select($"a".cast(DoubleType), $"a".cast(DoubleType), $"c")
    ds2.printSchema()
    

    你应该有

    root
     |-- a: double (nullable = false)
     |-- a: double (nullable = false)
     |-- c: string (nullable = false)
    
  • -1

    您也可以在使用sql查询选择时强制转换列,如下所示

    import spark.implicits._
    
    val ds = Seq((1,2,"abc"),(1,2,"abc")).toDF("a", "b","c").createOrReplaceTempView("temp")
    
    val ds1 = spark.sql("select cast(a as Double) , cast (b as Double), c from temp")
    
    ds1.printSchema()
    

    这有架构为

    root
     |-- a: double (nullable = false)
     |-- b: double (nullable = false)
     |-- c: string (nullable = true)
    

    现在您可以使用案例类转换为数据集

    case class abc(a: Double, b: Double, c: String)
    
    val ds2 = ds1.as[abc]
    ds2.printSchema()
    

    现在有哪个架构

    root
     |-- a: double (nullable = false)
     |-- b: double (nullable = false)
     |-- c: string (nullable = true)
    

    希望这可以帮助!

  • -1

    好的,我想我已经以更好的方式解决了这个问题 .

    我们可以只引用数据集的rdd,而不是在创建新数据集时使用collect .

    而不是

    val ds2 = spark.createDataset(ds1.as[abc].collect)
    

    我们用:

    val ds2 = spark.createDataset(ds1.as[abc].rdd)
    
    ds2.printSchema
    root
     |-- a: double (nullable = false)
     |-- b: double (nullable = false)
     |-- c: string (nullable = true)
    

    这使得延迟评估保持不变,但允许新数据集将encoder用于abc案例类,后续架构将在我们使用它创建新表时反映这一点 .

相关问题