首页 文章

在Spark / Scala中将RDD转换为Dataframe

提问于
浏览
5

RDD已以 Array[Array[String]] 格式创建,并具有以下值:

Array[Array[String]] = Array(Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1))

我想用架构创建一个dataFrame:

val schemaString = "callId oCallId callTime duration calltype swId"

下一步:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)

给出以下错误:

console:45:错误:重载方法值createDataFrame with alternatives:(rdd:org.apache.spark.api.java.JavaRDD [],beanClass:Class [])org.apache.spark.sql.DataFrame(rdd:org .apache.spark.rdd.RDD [],beanClass:Class [])org.apache.spark.sql.DataFrame(rowRDD:org.apache.spark.api.java.JavaRDD [org.apache.spark.sql.Row ],schema:org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame(rowRDD:org.apache.spark.rdd.RDD [org.apache.spark.sql.Row],schema :org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame无法应用于(org.apache.spark.rdd.RDD [Array [String]],org.apache.spark.sql .types.StructType)val calDF = sqlContext.createDataFrame(rowRDD,schema)

3 回答

  • 4

    只需粘贴到 spark-shell

    val a = 
      Array(
        Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
        Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))
    
    val rdd = sc.makeRDD(a)
    
    case class X(callId: String, oCallId: String, 
      callTime: String, duration: String, calltype: String, swId: String)
    

    然后 map() 在RDD上创建案例类的实例,然后使用 toDF() 创建DataFrame:

    scala> val df = rdd.map { 
      case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
    df: org.apache.spark.sql.DataFrame = 
      [callId: string, oCallId: string, callTime: string, 
        duration: string, calltype: string, swId: string]
    

    这推断出案例类的架构 .

    然后你可以继续:

    scala> df.printSchema()
    root
     |-- callId: string (nullable = true)
     |-- oCallId: string (nullable = true)
     |-- callTime: string (nullable = true)
     |-- duration: string (nullable = true)
     |-- calltype: string (nullable = true)
     |-- swId: string (nullable = true)
    
    scala> df.show()
    +----------+-------+-------------------+--------+--------+----+
    |    callId|oCallId|           callTime|duration|calltype|swId|
    +----------+-------+-------------------+--------+--------+----+
    |4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
    |4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
    +----------+-------+-------------------+--------+--------+----+
    

    如果要在正常程序中使用 toDF() (不在 spark-shell 中),请确保(引自here):

    创建 SQLContext

    • import sqlContext.implicits._

    • 使用 toDF() 在方法外定义案例类

  • 1

    您需要首先将 Array 转换为 Row ,然后定义架构 . 我假设你的大部分领域都是 Long

    val rdd: RDD[Array[String]] = ???
        val rows: RDD[Row] = rdd map {
          case Array(callId, oCallId, callTime, duration, swId) =>
            Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong)
        }
    
        object schema {
          val callId = StructField("callId", LongType)
          val oCallId = StructField("oCallId", StringType)
          val callTime = StructField("callTime", StringType)
          val duration = StructField("duration", LongType)
          val swId = StructField("swId", LongType)
    
          val struct = StructType(Array(callId, oCallId, callTime, duration, swId))
        }
    
        sqlContext.createDataFrame(rows, schema.struct)
    
  • 9

    我假设您的 schemaSpark Guide一样,如下:

    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    

    如果你看一下createDataFrame的签名,这里接受一个StructType作为第二个参数(对于Scala)

    def createDataFrame(rowRDD:RDD [Row],schema:StructType):DataFrame使用给定的模式从包含Rows的RDD创建DataFrame .

    所以它接受第一个参数 RDD[Row] . 您在 rowRDD 中所拥有的是 RDD[Array[String]] 所以存在不匹配 .

    你需要 RDD[Array[String]] 吗?

    否则,您可以使用以下命令创建数据帧:

    val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim))
    

相关问题