假设,我有一个json文件,其中包含以下结构中的行:
{
"a": 1,
"b": {
"bb1": 1,
"bb2": 2
}
}
我想更改键 bb1
的值或添加一个新键,如: bb3
. 目前,我使用spark.read.json将json文件作为DataFrame加载到spark中,并使用df.rdd.map将RDD的每一行映射到dict . 然后,更改嵌套键值或添加嵌套键并将dict转换为行 . 最后,将RDD转换为DataFrame . 工作流程如下:
def map_func(row):
dictionary = row.asDict(True)
adding new key or changing key value
return as_row(dictionary) # as_row convert dict to row recursively
df = spark.read.json("json_file")
df.rdd.map(map_func).toDF().write.json("new_json_file")
这对我有用 . 但我担心转换DataFrame - > RDD(Row - > dict - > Row) - > DataFrame会导致效率下降 . 有没有其他方法可以满足这种需求,但不能以效率为代价?
我使用的最终解决方案是使用withColumn并动态构建b的模式 . 首先,我们可以通过以下方式从df架构获取 b_schema
:
b_schema = next(field['type'] for field in df.schema.jsonValue()['fields'] if field['name'] == 'b')
之后, b_schema
是dict,我们可以通过以下方式添加新字段:
b_schema['fields'].append({"metadata":{},"type":"string","name":"bb3","nullable":True})
然后,我们可以通过以下方式将其转换为StructType:
new_b = StructType.fromJson(b_schema)
在map_func中,我们可以将Row转换为dict并填充新字段:
def map_func(row):
data = row.asDict(True)
data['bb3'] = data['bb1'] + data['bb2']
return data
map_udf = udf(map_func, new_b)
df.withColumn('b', map_udf('b')).collect()
谢谢@Mariusz
1 回答
你可以使用
map_func
作为udf,因此省略转换DF - > RDD - > DF,仍然具有python的灵活性来实现业务逻辑 . 您只需要创建模式对象:然后定义
map_func
和udf:最后将此UDF应用于数据帧:
EDIT :
根据评论:您可以更简单的方式向现有StructType添加字段,例如: