将数据帧转换为pyspark中嵌套的json对象的数组

loading...


-1

我创建了如下数据框:

+----+-------+-------+
| age| number|name   |
+----+-------+-------+
|  16|     12|A      |
|  16|     13|B      |
|  17|     16|E      |
|  17|     17|F      |
+----+-------+-------+

如何将其转换为以下json:

{ 
'age' : 16,  
'values' : [{‘number’: ‘12’ , ‘name’ : 'A'},{‘number’: ‘12’ , ‘name’ : 'A'} ] 
},{ 
'age' : 17,  
'values' : [{‘number’: ‘16’ , ‘name’ : 'E'},{‘number’: ‘17’ , ‘name’ : 'F'} ] 
}

loading...

2回答

  • 2

    您可以将DF转换为RDD并应用您的转换:

    NewSchema = StructType([StructField("age", IntegerType())
                               , StructField("values", StringType())
                            ])
    
    
    res_df = df.rdd.map(lambda row: (row[0], ([{'number':row[1], 'name':row[2]}])))\
        .reduceByKey(lambda x, y: x + y)\
        .map(lambda row: (row[0], json.dumps(row[1])))\
        .toDF(NewSchema)
    
    res_df.show(20, False)
    

    显示res_df:

    +---+------------------------------------------------------------+
    |age|values                                                      |
    +---+------------------------------------------------------------+
    |16 |[{"number": 12, "name": "A"}, [{"number": 13, "name": "B"}] |
    |17 |[{"number": 17, "name": "F"}, [{"number": 16, "name": "E"}] |
    +---+------------------------------------------------------------+
    

    将DF保存为JSON文件:

    res_df.coalesce(1).write.format('json').save('output.json')
    

  • 1

    假设 df 是您的数据帧,

    from pyspark.sql import functions as F
    
    new_df = df.select(
        "age",
        F.struct(
            F.col("number"),
            F.col("name"),
        ).alias("values")
    ).groupBy(
        "age"
    ).agg(
        F.collect_list("values").alias("values")
    )
    
    new_df.toJSON()
    # or
    new_df.write.json(...)
    
评论

暂时没有评论!