考虑我有一个python字典键值对列表,其中key对应于表的列名,所以对于下面的列表如何将其转换为带有两个cols arg1 arg2的pyspark数据帧?
[{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""}]
我如何使用以下构造来做到这一点?
df = sc.parallelize([ ... ]).toDF
在上面的代码中放置arg1 arg2的位置(...)
旧方式:
sc.parallelize([{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""}]).toDF()
新方法:
from pyspark.sql import Row from collections import OrderedDict def convert_to_row(d: dict) -> Row: return Row(**OrderedDict(sorted(d.items()))) sc.parallelize([{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""}]) \ .map(convert_to_row) \ .toDF()
我必须修改接受的答案,以便在运行Spark 2.0的Python 2.7中为我工作 .
from collections import OrderedDict from pyspark.sql import SparkSession, Row spark = (SparkSession .builder .getOrCreate() ) schema = StructType([ StructField('arg1', StringType(), True), StructField('arg2', StringType(), True) ]) dta = [{"arg1": "", "arg2": ""}, {"arg1": "", "arg2": ""}] dtaRDD = spark.sparkContext.parallelize(dta) \ .map(lambda x: Row(**OrderedDict(sorted(x.items())))) dtaDF = spark.createDataFrame(dtaRdd, schema)
2 回答
旧方式:
新方法:
我必须修改接受的答案,以便在运行Spark 2.0的Python 2.7中为我工作 .