将数据帧转换为pyspark中嵌套的json对象的数组

我创建了如下数据框:

+----+-------+-------+
| age| number|name   |
+----+-------+-------+
|  16|     12|A      |
|  16|     13|B      |
|  17|     16|E      |
|  17|     17|F      |
+----+-------+-------+

如何将其转换为以下json:

{ 
'age' : 16,  
'values' : [{‘number’: ‘12’ , ‘name’ : 'A'},{‘number’: ‘12’ , ‘name’ : 'A'} ] 
},{ 
'age' : 17,  
'values' : [{‘number’: ‘16’ , ‘name’ : 'E'},{‘number’: ‘17’ , ‘name’ : 'F'} ] 
}

回答(2)

2 years ago

您可以将DF转换为RDD并应用您的转换:

NewSchema = StructType([StructField("age", IntegerType())
                           , StructField("values", StringType())
                        ])


res_df = df.rdd.map(lambda row: (row[0], ([{'number':row[1], 'name':row[2]}])))\
    .reduceByKey(lambda x, y: x + y)\
    .map(lambda row: (row[0], json.dumps(row[1])))\
    .toDF(NewSchema)

res_df.show(20, False)

显示res_df:

+---+------------------------------------------------------------+
|age|values                                                      |
+---+------------------------------------------------------------+
|16 |[{"number": 12, "name": "A"}, [{"number": 13, "name": "B"}] |
|17 |[{"number": 17, "name": "F"}, [{"number": 16, "name": "E"}] |
+---+------------------------------------------------------------+

将DF保存为JSON文件:

res_df.coalesce(1).write.format('json').save('output.json')

2 years ago

假设 df 是您的数据帧,

from pyspark.sql import functions as F

new_df = df.select(
    "age",
    F.struct(
        F.col("number"),
        F.col("name"),
    ).alias("values")
).groupBy(
    "age"
).agg(
    F.collect_list("values").alias("values")
)

new_df.toJSON()
# or
new_df.write.json(...)