将标准python键值字典列表转换为pyspark数据帧

考虑我有一个python字典键值对列表,其中key对应于表的列名,所以对于下面的列表如何将其转换为带有两个cols arg1 arg2的pyspark数据帧?

[{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""}]

我如何使用以下构造来做到这一点?

df = sc.parallelize([
    ...
]).toDF

在上面的代码中放置arg1 arg2的位置(...)

回答(2)

2 years ago

旧方式:

sc.parallelize([{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""}]).toDF()

新方法:

from pyspark.sql import Row
from collections import OrderedDict

def convert_to_row(d: dict) -> Row:
    return Row(**OrderedDict(sorted(d.items())))

sc.parallelize([{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""}]) \
    .map(convert_to_row) \ 
    .toDF()

2 years ago

我必须修改接受的答案,以便在运行Spark 2.0的Python 2.7中为我工作 .

from collections import OrderedDict
from pyspark.sql import SparkSession, Row

spark = (SparkSession
        .builder
        .getOrCreate()
    )

schema = StructType([
    StructField('arg1', StringType(), True),
    StructField('arg2', StringType(), True)
])

dta = [{"arg1": "", "arg2": ""}, {"arg1": "", "arg2": ""}]

dtaRDD = spark.sparkContext.parallelize(dta) \
    .map(lambda x: Row(**OrderedDict(sorted(x.items()))))

dtaDF = spark.createDataFrame(dtaRdd, schema)