首页 文章

RDD到Dataframe Spark Couchbase

提问于
浏览
0

我从NOSQL数据库创建了一个RDD,我想将RDD转换为数据帧 . 我尝试过很多选项,但都会导致错误 .

val df = sc.couchbaseQuery(test).map(_.value).collect().foreach(println)


{"accountStatus":"AccountOpen","custId":"140034"}
{"accountStatus":"AccountOpen","custId":"140385"}
{"accountStatus":"AccountClosed","subId":"10795","custId":"139698","subStatus":"Active"}
{"accountStatus":"AccountClosed","subId":"11364","custId":"140925","subStatus":"Paused"}
{"accountStatus":"AccountOpen","subId":"10413","custId":"138842","subStatus":"Active"}
{"accountStatus":"AccountOpen","subId":"10414","custId":"138842","subStatus":"Active"}
{"accountStatus":"AccountClosed","subId":"11314","custId":"140720","subStatus":"Paused"}
{"accountStatus":"AccountOpen","custId":"139166"}
{"accountStatus":"AccountClosed","subId":"10735","custId":"139558","subStatus":"Paused"}
{"accountStatus":"AccountOpen","custId":"139575"}
df: Unit = ()

我尝试在代码的末尾添加.toDF(),并创建模式并使用createDataFrame但接收错误 . 什么是将RDD转换为Dataframe的最佳方法?

import org.apache.spark.sql.types._

// The schema is encoded in a string
val schemaString = "accountStatus subId custId subStatus"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

//

val peopleDF = spark.createDataFrame(df,schema)

错误

<console>:101: error: overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (Unit, org.apache.spark.sql.types.StructType)
       val peopleDF = spark.createDataFrame(df,schema)

其他

val df = sc.couchbaseQuery(test).map(_.value).toDF()

错误

<console>:93: error: value toDF is not a member of org.apache.spark.rdd.RDD[com.couchbase.client.java.document.json.JsonObject]
       val df1 = sc.couchbaseQuery(test).map(_.value).toDF()
                                                      ^

2 回答

  • 0

    在第一个示例中,您将 val df 分配给对foreach的调用结果,该类型的类型为 Unit .

    删除对collect和foreach的调用,这应该有效:

    // removed collect().foreach() here:
    val df = sc.couchbaseQuery(test).map(_.value)
    import org.apache.spark.sql.types._
    
    // The schema is encoded in a string
    val schemaString = "accountStatus subId custId subStatus"
    
    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    val peopleDF = spark.createDataFrame(df,schema)
    

    对于第二种方法,我怀疑spark sql不知道如何处理couchbase客户端提供的JsonObject,所以尝试将值映射到String,然后使用Spark sql将rdd读取为JSON

  • 0

    请尝试以下方式:

    val data = spark.sparkContext
      .couchbaseQuery(N1qlQuery.simple(q), bucket)
      .map(_.value.toString())
    
    spark.read.json(data)
    

    Spark从Couchbase JSON字符串本身推断出架构 .

相关问题