我们正在努力将Spark DataFrame转换为具有动态列的case类的DataSet . 每个用例的起点都是DataFrame,如下所示:

root
|-- id: string (nullable = true)
|-- time: long (nullable = true)
|-- c: struct (nullable = true)
|    |-- d: long (nullable = true)
|    |-- e: string (nullable = true)
|    |-- f: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- g: long (nullable = true)
|-- x: string (nullable = true)
|-- y: string (nullable = true)

所有这些DataFrame都有 idtime 列 . 其他列( cxy )是可配置的,可以根据计数中的每个用例(在某些DataFrame中,我们在 idtime 旁边最多包含10列)和类型进行更改 .

为了简化这些事件的复杂处理,我们希望将每一行转换为scala case类,如下所示:

case class Event(id: String, time: Long, payload: Seq[Any])

要么:

case class Event(id: String, time: Long, payload: Map[String, Any]) // payload: <column-name>/<value>

可悲的是, df.as[Event] 无法开箱即用 . 任何想法,如何做到这一点?

为每个用例编写案例类都不是一种选择,因为我们只想通过YAML文件配置作业,而不想为每个新用例调整代码 . 我们需要更通用的东西! (我考虑过在运行时生成一个case类,但这很复杂......)

谢谢你的帮助!

Update - Solution

我们现在想出了以下解决方案:

case class Event(id:String,time:Long,payload:Seq [Array [Byte]])

并在 kryo 的帮助下转换DataFrame:

val idIndex = df.columns.indexOf("id")
val timeIndex = df.columns.indexOf("time")
val otherColumns = List.range(0, df.columns.length).filterNot(i => i == idIndex && i == timeIndex)

val kryo = new Kryo()

val ds = df.map(row => {
  Event(
    row.getAs[String](idIndex),
    row.getAs[Long](timeIndex),
    otherColumns.map(index => {
      val output = new Output(192, 8192)
      kryo.writeObject(output, row.get(index))
      output.getBuffer
    })
  )
})

ds.printSchema()

谢谢你的帮助 .