首页 文章

如何使用Spark DataFrames查询JSON数据列?

提问于
浏览
19

我有一个Cassandra表,为简单起见,看起来像:

key: text
jsonData: text
blobData: blob

我可以使用spark和spark-cassandra-connector为此创建一个基本数据框:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

我正在努力将JSON数据扩展到其底层结构中 . 我最终希望能够根据json字符串中的属性进行过滤并返回blob数据 . 像jsonData.foo =“bar”之类的东西并返回blobData . 这目前可能吗?

3 回答

  • 2

    Spark 2.1+

    你可以使用from_json功能:

    import org.apache.spark.sql.functions.from_json
    import org.apache.spark.sql.types._
    
    val schema = StructType(Seq(
      StructField("k", StringType, true), StructField("v", DoubleType, true)
    ))
    
    df.withColumn("jsonData", from_json($"jsonData", schema))
    

    Spark 1.6+

    您可以使用带有列和路径的 get_json_object

    import org.apache.spark.sql.functions.get_json_object
    
    val exprs = Seq("k", "v").map(
      c => get_json_object($"jsonData", s"$$.$c").alias(c))
    
    df.select($"*" +: exprs: _*)
    

    并将字段提取到单个字符串,这些字符串可以进一步转换为预期类型 .

    path 参数使用点语法表示,前导 $. 表示文档根(因为上面的代码使用字符串插值 $ 必须进行转义,因此 $$. ) .

    Spark <= 1.5

    目前有可能吗?

    据我所知,这不是直接可能的 . 你可以尝试类似的东西:

    val df = sc.parallelize(Seq(
      ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
      ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
    )).toDF("key", "jsonData", "blobData")
    

    我假设 blob 字段不能用JSON表示 . 否则你的出租车省略拆分和加入:

    import org.apache.spark.sql.Row
    
    val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
    val jsons = sqlContext.read.json(df.drop("blobData").map{
      case Row(key: String, json: String) =>
        s"""{"key": "$key", "jsonData": $json}"""
    }) 
    
    val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
    parsed.printSchema
    
    // root
    //  |-- jsonData: struct (nullable = true)
    //  |    |-- k: string (nullable = true)
    //  |    |-- v: double (nullable = true)
    //  |-- key: long (nullable = true)
    //  |-- blobData: string (nullable = true)
    

    另一种(更便宜,但更复杂)方法是使用UDF来解析JSON并输出 structmap 列 . 例如这样的事情:

    import net.liftweb.json.parse
    
    case class KV(k: String, v: Int)
    
    val parseJson = udf((s: String) => {
      implicit val formats = net.liftweb.json.DefaultFormats
      parse(s).extract[KV]
    })
    
    val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
    parsed.show
    
    // +---+--------------------+------------------+----------+
    // |key|            jsonData|          blobData|parsedJSON|
    // +---+--------------------+------------------+----------+
    // |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
    // |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
    // +---+--------------------+------------------+----------+
    
    parsed.printSchema
    
    // root
    //  |-- key: string (nullable = true)
    //  |-- jsonData: string (nullable = true)
    //  |-- blobData: string (nullable = true)
    //  |-- parsedJSON: struct (nullable = true)
    //  |    |-- k: string (nullable = true)
    //  |    |-- v: integer (nullable = false)
    
  • 39

    from_json 功能正是您所需要的 . 您的代码将类似于:

    val df = sqlContext.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
      .load()
    
    //You can define whatever struct type that your json states
    val schema = StructType(Seq(
      StructField("key", StringType, true), 
      StructField("value", DoubleType, true)
    ))
    
    df.withColumn("jsonData", from_json(col("jsonData"), schema))
    
  • 1

    底层JSON字符串是

    "{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}";
    

    下面是过滤JSON并将所需数据加载到Cassandra的脚本 .

    sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
                .write.format("org.apache.spark.sql.cassandra")
                .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
                .mode(SaveMode.Append)
                .save()
    

相关问题