首页 文章

如何将RDD [Row]转换回DataFrame [duplicate]

提问于
浏览
9

这个问题在这里已有答案:

我一直在玩转换RDD到DataFrames然后再回来 . 首先,我有一个名为dataPair的类型(Int,Int)的RDD . 然后我创建了一个带有列 Headers 的DataFrame对象:

val dataFrame = dataPair.toDF(header(0), header(1))

然后我使用以下命令将其从DataFrame转换回RDD:

val testRDD = dataFrame.rdd

返回类型为org.apache.spark.sql.Row的RDD(不是(Int,Int)) . 然后我想用.toDF将它转换回RDD,但是我收到一个错误:

error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

我已经尝试为testRDD定义类型Data(Int,Int)的Schema,但是我得到了类型不匹配的异常:

error: type mismatch;
found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
    val testRDD: RDD[Data] = dataFrame.rdd
                                       ^

我已经进口了

import sqlContext.implicits._

1 回答

  • 17

    要从RDD of Rows创建DataFrame,通常有两个主要选项:

    1) 您可以使用 toDF() ,可以通过 import sqlContext.implicits._ 导入 . 但是,此方法仅适用于以下类型的RDD:

    • RDD[Int]

    • RDD[Long]

    • RDD[String]

    • RDD[T <: scala.Product]

    (来源: SQLContext.implicits 对象的Scaladoc

    最后一个签名实际上意味着它可以用于元组的RDD或案例类的RDD(因为元组和案例类是scala.Product的子类) .

    因此,要将此方法用于 RDD[Row] ,您必须将其映射到 RDD[T <: scala.Product] . 这可以通过将每一行映射到自定义案例类或元组来完成,如以下代码片段所示:

    val df = rdd.map({ 
      case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
    }).toDF("col1_name", ..., "colN_name")
    

    要么

    case class MyClass(val1: String, ..., valN: Long = 0L)
    val df = rdd.map({ 
      case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
    }).toDF("col1_name", ..., "colN_name")
    

    这种方法的主要缺点(在我看来)是你必须逐列显式地设置map函数中结果DataFrame的模式 . 如果你事先不知道架构,也许这可以以编程方式完成,但事情可能会有点混乱 . 所以,或者,还有另一种选择:


    2) 您可以使用SQLContext,该对象在SQLContext对象中可用 . 例:

    val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
    

    请注意,无需显式设置任何架构列 . 我们重用了旧的DF模式,该模式属于 StructType 类,可以轻松扩展 . 然而,这种方法有时是不可能的,并且在某些情况下可能比第一种方法效率低 .

    我希望它比以前更清楚 . 干杯 .

相关问题