首页 文章

pyspark / dataframe - 创建嵌套结构

提问于
浏览
2

我正在使用带有数据帧的pyspark,并希望创建一个嵌套结构,如下所示

之前:

Column 1 | Column 2 | Column 3 
--------------------------------
A    | B   | 1 
A    | B   | 2 
A    | C   | 1

后:

Column 1 | Column 4 
--------------------------------
A    | [B : [1,2]] 
A    | [C : [1]]

这可行吗?

2 回答

  • 1

    我不认为你可以得到那个确切的输出,但你可以接近 . 问题是列4的键名 . 在Spark中,结构需要预先知道一组固定的列 . 但是,让我们留待以后,首先,聚合:

    import pyspark
    from pyspark.sql import functions as F
    
    sc = pyspark.SparkContext()
    spark = pyspark.sql.SparkSession(sc)
    
    data = [('A', 'B', 1), ('A', 'B', 2), ('A', 'C', 1)]
    columns = ['Column1', 'Column2', 'Column3']
    
    data = spark.createDataFrame(data, columns)
    
    data.createOrReplaceTempView("data")
    data.show()
    
    # Result
    +-------+-------+-------+
    |Column1|Column2|Column3|
    +-------+-------+-------+
    |      A|      B|      1|
    |      A|      B|      2|
    |      A|      C|      1|
    +-------+-------+-------+
    
    nested = spark.sql("SELECT Column1, Column2, STRUCT(COLLECT_LIST(Column3) AS data) AS Column4 FROM data GROUP BY Column1, Column2")
    nested.toJSON().collect()
    
    # Result
    ['{"Column1":"A","Column2":"C","Column4":{"data":[1]}}',
     '{"Column1":"A","Column2":"B","Column4":{"data":[1,2]}}']
    

    这几乎是你想要的,对吧?问题是,如果您事先不知道您的密钥名称(即第2列中的值),Spark无法确定您的数据结构 . 另外,我不完全确定如何使用列的值作为结构的键,除非您使用UDF(可能带有 PIVOT ?):

    datatype = 'struct<B:array<bigint>,C:array<bigint>>'  # Add any other potential keys here.
    @F.udf(datatype)
    def replace_struct_name(column2_value, column4_value):
        return {column2_value: column4_value['data']}
    
    nested.withColumn('Column5', replace_struct_name(F.col("Column2"), F.col("Column4"))).toJSON().collect()
    
    # Output
    ['{"Column1":"A","Column2":"C","Column4":{"C":[1]}}',
     '{"Column1":"A","Column2":"B","Column4":{"B":[1,2]}}']
    

    这当然具有以下缺点:键的数量必须是离散的并且事先已知,否则将默默地忽略其他键值 .

  • 0

    首先,您可以重现数据帧的示例 .

    js = [{"col1": "A", "col2":"B", "col3":1},{"col1": "A", "col2":"B", "col3":2},{"col1": "A", "col2":"C", "col3":1}]
    jsrdd = sc.parallelize(js)
    sqlContext = SQLContext(sc)
    jsdf = sqlContext.read.json(jsrdd)
    jsdf.show()
    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   A|   B|   1|
    |   A|   B|   2|
    |   A|   C|   1|
    +----+----+----+
    

    现在,列表不会存储为键值对 . 在column2上执行groupby之后,您可以使用 dictionary 或简单 collect_list() .

    jsdf.groupby(['col1', 'col2']).agg(F.collect_list('col3')).show()
    +----+----+------------------+
    |col1|col2|collect_list(col3)|
    +----+----+------------------+
    |   A|   C|               [1]|
    |   A|   B|            [1, 2]|
    +----+----+------------------+
    

相关问题