首页 文章

如何在Spark中读取嵌套集合

提问于
浏览
17

我有一张镶有 table 的镶木 table

,array <struct <col1,col2,.. colN >>

可以使用LATERAL VIEW语法在Hive中对此表运行查询 .

如何将此表读入RDD,更重要的是如何在Spark中过滤,映射等嵌套集合?

在Spark文档中找不到对此的任何引用 . 提前感谢您的任何信息!

PS . 感觉可能有助于在 table 上给出一些统计数据 . 主表~600中的列数 . 行数~200m . 嵌套集合中的“列”数〜10 . 平均集合中的平均记录数~35 .

4 回答

  • 8

    在嵌套集合的情况下没有魔力 . Spark将以 RDD[(String, String)]RDD[(String, Seq[String])] 的相同方式处理 .

    但是,从Parquet文件中读取这样的嵌套集合可能会很棘手 .

    我们来自 spark-shell (1.3.1)的例子:

    scala> import sqlContext.implicits._
    import sqlContext.implicits._
    
    scala> case class Inner(a: String, b: String)
    defined class Inner
    
    scala> case class Outer(key: String, inners: Seq[Inner])
    defined class Outer
    

    写下镶木地板文件:

    scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
    outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
    
    scala> outers.toDF.saveAsParquetFile("outers.parquet")
    

    阅读镶木地板文件:

    scala> import org.apache.spark.sql.catalyst.expressions.Row
    import org.apache.spark.sql.catalyst.expressions.Row
    
    scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
    dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   
    
    scala> val outers = dataFrame.map { row =>
         |   val key = row.getString(0)
         |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
         |   Outer(key, inners)
         | }
    outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
    

    重要的是 row.getAs[Seq[Row]](1) . 嵌套序列 struct 的内部表示是 ArrayBuffer[Row] ,您可以使用它的任何超类型而不是 Seq[Row] . 1 是外行中的列索引 . 我在这里使用了 getAs 方法,但最新版本的Spark还有其他选择 . 请参阅Row trait的源代码 .

    现在您有一个 RDD[Outer] ,您可以应用任何想要的转换或操作 .

    // Filter the outers
    outers.filter(_.inners.nonEmpty)
    
    // Filter the inners
    outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
    

    请注意,我们仅使用spark-SQL库来读取镶木地板文件 . 例如,您可以在将数据映射到RDD之前直接在DataFrame上选择所需的列 .

    dataFrame.select('col1, 'col2).map { row => ... }
    
  • 19

    我会给出一个基于Python的答案,因为那是我正在使用的 . 我认为Scala有类似的东西 .

    根据Python API docs,在Spark 1.4.0中添加了 explode 函数来处理DataFrames中的嵌套数组 .

    创建测试数据框:

    from pyspark.sql import Row
    
    df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
    df.show()
    
    ## +-+--------------------+
    ## |a|             intlist|
    ## +-+--------------------+
    ## |1|ArrayBuffer(1, 2, 3)|
    ## |2|ArrayBuffer(4, 5, 6)|
    ## +-+--------------------+
    

    使用 explode 展平列表列:

    from pyspark.sql.functions import explode
    
    df.select(df.a, explode(df.intlist)).show()
    
    ## +-+---+
    ## |a|_c0|
    ## +-+---+
    ## |1|  1|
    ## |1|  2|
    ## |1|  3|
    ## |2|  4|
    ## |2|  5|
    ## |2|  6|
    ## +-+---+
    
  • 3

    另一种方法是使用这样的模式匹配:

    val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
      case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
        case List(a:String, b: String) => (a, b)
      }).toList
    })
    

    您可以直接在Row上进行模式匹配,但由于某些原因可能会失败 .

  • 1

    以上答案都是很好的答案,并从不同方面解决这个问题; Spark SQL也是访问嵌套数据的非常有用的方法 .

    下面是如何在SQL中直接使用explode()来查询嵌套集合的示例 .

    SELECT hholdid, tsp.person_seq_no 
    FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
            FROM disc_mrt.unified_fact uf
         )
    

    tsp_ids是一个嵌套的结构体,它有许多属性,包括我在上面的外部查询中选择的person_seq_no .

    以上是在Spark 2.0中测试的 . 我做了一个小测试,它在Spark 1.6中不起作用 . 当Spark 2不在时问这个问题,所以这个答案很好地补充了处理嵌套结构的可用选项列表 .

    在用于SQL访问的explode()上,无法解析JIRA:

相关问题