首页 文章

Generic T as Spark Dataset [T]构造函数

提问于
浏览
3

在以下代码段中, tryParquet 函数尝试从Parquet文件加载数据集(如果存在) . 如果没有,它会计算,持久并返回提供的数据集计划:

import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset

sealed trait CustomRow

case class MyRow(
  id: Int,
  name: String
) extends CustomRow

val ds: Dataset[MyRow] =
  Seq((1, "foo"),
      (2, "bar"),
      (3, "baz")).toDF("id", "name").as[MyRow]

def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
    Try(session.read.parquet(path)) match {
      case Success(df) => df.as[T] // <---- compile error here
      case Failure(_)  => {
        target.write.parquet(path)
        target
      }
    }

val readyDS: Dataset[MyRow] =
    tryParquet(spark, "/path/to/file.parq", ds)

但是这会在 df.as[T] 上产生编译错误:

无法找到存储在数据集中的类型的编码器 . 导入spark.implicits支持原始类型(Int,String等)和产品类型(case类) . 在将来的版本中将添加对序列化其他类型的支持 . 案例成功(df)=> df.as [T]

可以通过使 tryParquet cast df 返回无类型的 DataFrame 并让调用者强制转换为所需的构造函数来避免此问题 . 但是,在我们希望函数内部管理类型的情况下是否有任何解决方案?

1 回答

  • 6

    通过在type参数中使用 Encoder 看起来是可能的:

    import org.apache.spark.sql.Encoder
    
    def tryParquet[T <: CustomRow: Encoder](...)
    

    这样编译器就可以证明 df.as[T] 在构造对象时提供了一个编码器 .

相关问题