首页 文章

使用Spark从DynamoDB JSON字符串中提取嵌套的Json字段?

提问于
浏览
3

我正在从Spark读取一个dynamodb表,这个表在一个字段中有一个JSON字符串,在其他字段中有字符串 . 我能够读取JSON字段,但不能读取嵌套的JSON字段 . 这不是query Json Column using dataframes的重复 . 问题确实解释了如何从JSON字符串中提取列,但不是从嵌套的JSON列中提取列 .

import com.github.traviscrawford.spark.dynamodb._
val users = sqlContext.read.dynamodb("Dynamodb_table")

users.show(1)

样本数据集

|col1                                                        | ID | field2|field3|
 -------------------------------------------------------------------------------------
 |{"a":[{"b":"value1","x":23},{"b":value2,"x":52}],"c":"valC"}|A1  | X1    |Y1    |

我需要从col1(JSON结构)和ID字段中提取几个字段 . 我能够弄清楚如何解析JSON字段(col1)并从col1获取字段'c',如here所述,但无法提取嵌套字段 .

我的代码:

val users = sqlContext.read.dynamodb("Dynamodb_table")
val data = users.selectExpr("get_json_object(col1, '$.c')","get_json_object(col1, '$.a')","ID")

data.show(1,false)
|a                                              |c   |ID|
---------------------------------------------------------
|[{"b":"value1","x":23},{"b":value2","x":52}...]|valC|A1|

现在,当我尝试在上面的数据框上应用相同的get_json_object时,我得到所有空值 .

val nestedData = data.selectExpr("get_json_object(a, '$.b')","c","ID")
nestedData.show(false)

|get_json_object(a, '$.b')| c  | ID|
------------------------------------
|null                     |valC|A1 |

我试过爆炸,因为col'a'有数组和结构 . 但这不起作用,因为数据框'数据'将col / field'a'作为字符串而不是数组返回 . 任何想法如何解决这个问题?

更新:我还尝试使用JSON4s和net.liftweb.json.parse进行解析 . 这也没有帮助

case class aInfo(b: String) 
case class col1(a: Option[aInfo]), c: String)

import net.liftweb.json.parse
val parseJson = udf((data: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(data).extract[Data]
})

val parsed = users.withColumn("parsedJSON", parseJson($"data"))
parsed.show(1)

当我使用这些解析器时,所有值都显示为null .

我的预期结果:我试图从数据集中获得一个扁平的结构

|b     |x |c   | ID|
--------------------
|value1|23|valC|A1 |
|value2|52|valC|A1 |

1 回答

  • 2

    我相信拼图的所有必需部分已经在这里,所以让我们一步一步地遵循 . 您的数据相当于:

    val df = Seq((
      """{"a":[{"b":"value1"},{"b": "value2"}],"c":"valC"}""", "A1", "X1", "Y1"
    )).toDF("col1", "ID",  "field2", "field3")
    

    Spark提供的json4s实现与Lift相同的查询API:

    import org.json4s._
    import org.json4s.jackson.JsonMethods._
    

    我们可以使用例如LINQ样式API来定义UDF:

    val getBs = udf((s: String) => for { 
      JString(b) <- parse(s) \ "a" \ "b" 
    } yield b)
    

    如果要提取多个字段,您当然可以扩展它 . 例如,如果JSON字符串有多个字段

    {"a":[{"b":"value1","d":1},{"b":"value2","d":2}],"c":"valC"}
    

    您可以:

    for  {
      JObject(a) <- parse(s) \ "a"
      JField("b", JString(b))  <- a
      JField("d", JInt(d))  <- a
    } yield (b, d)
    

    这假设两个字段都存在,否则将不匹配 . 要处理缺少的字段,您可能更喜欢XPath-like表达式或提取器:

    case class A(b: Option[String], d: Option[Int])
    
    (parse(s) \ "a").extract(Seq[A])
    

    像这样的UDF可以与 explode 一起使用来提取字段:

    val withBs = df.withColumn("b", explode(getBs($"col1")))
    

    结果:

    +--------------------+---+------+------+------+
    |                col1| ID|field2|field3|     b|
    +--------------------+---+------+------+------+
    |{"a":[{"b":"value...| A1|    X1|    Y1|value1|
    |{"a":[{"b":"value...| A1|    X1|    Y1|value2|
    +--------------------+---+------+------+------+
    

    您尝试使用Lift是不正确的,因为您希望 aaInfo 的序列,但仅将其定义为 Option[aInfo] . 它应该是 Option[Seq[aInfo]]

    case class col1(a: Option[Seq[aInfo]], c: String)
    

    对于像这样定义的类,解析应该没有问题 .

    如果使用当前版本(Spark 2.1.0),SPARK-17699引入了 from_json 方法,该方法需要一个架构:

    import org.apache.spark.sql.types._
    
    val bSchema = StructType(Seq(StructField("b", StringType, true)))
    val aSchema = StructField("a", ArrayType(bSchema), true)
    val cSchema = StructField("c", StringType, true)
    
    val schema =  StructType(Seq(aSchema, cSchema))
    

    并可以应用为:

    import org.apache.spark.sql.functions.from_json
    
    val parsed = df.withColumn("col1", from_json($"col1", schema))
    

    之后,您可以使用常用符号选择字段:

    parsed.select($"col1.a.b")
    

相关问题