首页 文章

如何将数据帧转换为spark中的rdd(键字符串,json字符串)

提问于
浏览
1

我从下面的json创建了数据帧 .

val df = sqlContext.read.json("my.json")

之后,我想从Spark数据帧创建一个rdd(key,JSON) . 我找到了df.toJSON . 但是,它创建了rdd [string] .

我想创建rdd [string(key),string(JSON)] . 如何在spark中将spark数据帧转换为rdd(string(key),string(JSON)) .

我的数据框如下所示 .

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- data1: string (nullable = true)
 |    |    |-- data2: double (nullable = true)
 |    |    |-- data3: double (nullable = true)
 |-- image: string (nullable = true)
 |-- flag: boolean (nullable = true)

2 回答

  • 0

    我可以给你一个解决方案,虽然我不知道你的用例是什么,因为如果你想提取id作为区分json文件的关键,我想你可以只过滤数据帧上的id列 . 无论如何以下方法可以满足您的要求:

    测试json文件:test.json

    {"id": "1","data": "data1","image": "image1"}
    {"id": "2","data": "data2","image": "image2"}
    

    scala代码(withColumn udf json4s):

    import org.json4s.{DefaultFormats, MappingException}
    import org.json4s.jackson.JsonMethods._
    import org.apache.spark.sql.functions._
    
    def getJsonKey(jsonstring: String): (String) = {
        implicit val formats = DefaultFormats
        val parsedJson = parse(jsonstring)  
        val key = (parsedJson \ "id").extract[String]
        key
    }
    val getJsonKeyUDF = udf((jsonstring: String) => getJsonKey(jsonstring))
    
    val df = spark.read.format("text").load("test.json")
    +--------------------+
    |               value|
    +--------------------+
    |{"id": "1","data"...|
    |{"id": "2","data"...|
    +--------------------+
    
    val newDF = df.withColumn("key", getJsonKeyUDF(df("value")))
                  .withColumn("json", df("value"))
                  .select("key", "json")
    newDF.show
    +---+--------------------+
    |key|                json|
    +---+--------------------+
    |  1|{"id": "1","data"...|
    |  2|{"id": "2","data"...|
    +---+--------------------+
    
    newDF.rdd.collect
    >> Array[org.apache.spark.sql.Row] = Array([{"id": 1,"data": "data1","image": "image1"},1], [{"id": 2,"data": "data2","image": "image2"},2])
    

    更新:

    如果你真的想最终使用rdd,那么有一个更简单的方法:

    import org.json4s.{DefaultFormats, MappingException}
    import org.json4s.jackson.JsonMethods._
    
    val rdd = sc.textFile("test.json")
    >> rdd: org.apache.spark.rdd.RDD[String]
    
    val newRDD = rdd.map(jsonstring => {
        implicit val formats = DefaultFormats
        val parsedJson = parse(jsonstring)  
        val key = (parsedJson \ "id").extract[String]
        (key, jsonstring)
       })
    >> newRDD: org.apache.spark.rdd.RDD[(String, String)]
    
    newRDD.collect
    Array[(String, String)] = Array((1,{"id": "1","data": "data1","image": "image1"}), (2,{"id": "2","data": "data2","image": "image2"}))
    
  • 0

    简化Linbo的答案,SparkSQL已经有UDF来提取json字段:

    import sqlc.implicits._
      val rdd = sc.textFile("example.json")
        .toDF("json")
        .selectExpr("get_json_object(json,'$.id')","json")
        .map{ row => (row.getString(0),row.getString(1)) }
    

相关问题