这个问题在这里已有答案:
我一直在玩转换RDD到DataFrames然后再回来 . 首先,我有一个名为dataPair的类型(Int,Int)的RDD . 然后我创建了一个带有列 Headers 的DataFrame对象:
val dataFrame = dataPair.toDF(header(0), header(1))
然后我使用以下命令将其从DataFrame转换回RDD:
val testRDD = dataFrame.rdd
返回类型为org.apache.spark.sql.Row的RDD(不是(Int,Int)) . 然后我想用.toDF将它转换回RDD,但是我收到一个错误:
error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
我已经尝试为testRDD定义类型Data(Int,Int)的Schema,但是我得到了类型不匹配的异常:
error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
val testRDD: RDD[Data] = dataFrame.rdd
^
我已经进口了
import sqlContext.implicits._
1 回答
要从RDD of Rows创建DataFrame,通常有两个主要选项:
1) 您可以使用
toDF()
,可以通过import sqlContext.implicits._
导入 . 但是,此方法仅适用于以下类型的RDD:RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(来源:
SQLContext.implicits
对象的Scaladoc)最后一个签名实际上意味着它可以用于元组的RDD或案例类的RDD(因为元组和案例类是scala.Product的子类) .
因此,要将此方法用于
RDD[Row]
,您必须将其映射到RDD[T <: scala.Product]
. 这可以通过将每一行映射到自定义案例类或元组来完成,如以下代码片段所示:要么
这种方法的主要缺点(在我看来)是你必须逐列显式地设置map函数中结果DataFrame的模式 . 如果你事先不知道架构,也许这可以以编程方式完成,但事情可能会有点混乱 . 所以,或者,还有另一种选择:
2) 您可以使用SQLContext,该对象在SQLContext对象中可用 . 例:
请注意,无需显式设置任何架构列 . 我们重用了旧的DF模式,该模式属于
StructType
类,可以轻松扩展 . 然而,这种方法有时是不可能的,并且在某些情况下可能比第一种方法效率低 .我希望它比以前更清楚 . 干杯 .