首页 文章

将标准python键值字典列表转换为pyspark数据帧

提问于
浏览
5

考虑我有一个python字典键值对列表,其中key对应于表的列名,所以对于下面的列表如何将其转换为带有两个cols arg1 arg2的pyspark数据帧?

[{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""}]

我如何使用以下构造来做到这一点?

df = sc.parallelize([
    ...
]).toDF

在上面的代码中放置arg1 arg2的位置(...)

2 回答

  • 0

    旧方式:

    sc.parallelize([{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""}]).toDF()
    

    新方法:

    from pyspark.sql import Row
    from collections import OrderedDict
    
    def convert_to_row(d: dict) -> Row:
        return Row(**OrderedDict(sorted(d.items())))
    
    sc.parallelize([{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""},{"arg1": "", "arg2": ""}]) \
        .map(convert_to_row) \ 
        .toDF()
    
  • 17

    我必须修改接受的答案,以便在运行Spark 2.0的Python 2.7中为我工作 .

    from collections import OrderedDict
    from pyspark.sql import SparkSession, Row
    
    spark = (SparkSession
            .builder
            .getOrCreate()
        )
    
    schema = StructType([
        StructField('arg1', StringType(), True),
        StructField('arg2', StringType(), True)
    ])
    
    dta = [{"arg1": "", "arg2": ""}, {"arg1": "", "arg2": ""}]
    
    dtaRDD = spark.sparkContext.parallelize(dta) \
        .map(lambda x: Row(**OrderedDict(sorted(x.items()))))
    
    dtaDF = spark.createDataFrame(dtaRdd, schema)
    

相关问题