首页 文章

展平嵌套的Spark Dataframe

提问于
浏览
4

有没有办法压缩任意嵌套的Spark Dataframe?我所看到的大部分工作都是针对特定的模式编写的,我希望能够通过不同的嵌套类型(例如StructType,ArrayType,MapType等)来泛化一个Dataframe .

假设我有一个类似的架构:

StructType(List(StructField(field1,...), StructField(field2,...), ArrayType(StructType(List(StructField(nested_field1,...), StructField(nested_field2,...)),nested_array,...)))

希望将其调整为具有如下结构的平台:

field1
field2
nested_array.nested_field1
nested_array.nested_field2

仅供参考,寻找Pyspark的建议,但其他风味的Spark也值得赞赏 .

3 回答

  • 1

    这个问题可能有点旧,但对于那些仍在寻找解决方案的人来说,你可以使用select *来内联复杂的数据类型:

    首先让我们创建嵌套的数据帧:

    from pyspark.sql import HiveContext
    hc = HiveContext(sc)
    nested_df = hc.read.json(sc.parallelize(["""
    {
      "field1": 1, 
      "field2": 2, 
      "nested_array":{
         "nested_field1": 3,
         "nested_field2": 4
      }
    }
    """]))
    

    现在要压扁它:

    flat_df = nested_df.select("field1", "field2", "nested_array.*")
    

    你会在这里找到有用的例子https://docs.databricks.com/spark/latest/spark-sql/complex-types.html

    如果嵌套数组太多,可以使用:

    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    flat_df = nested_df.select(*flat_cols, *[c + ".*" for c in nested_cols])
    
  • -3

    这是我最后的方法:

    1)将数据帧中的行映射到dict的rdd . 在线找到合适的python代码来展平字典 .

    flat_rdd = nested_df.map(lambda x : flatten(x))
    

    哪里

    def flatten(x):
      x_dict = x.asDict()
      ...some flattening code...
      return x_dict
    

    2)将RDD [dict]转换回数据帧

    flat_df = sqlContext.createDataFrame(flat_rdd)
    
  • 9

    使用来自pyspark.sql.functions的爆炸

相关问题