首页 文章

如何在PySpark中的大型Spark数据框中对每个行子集进行映射操作

提问于
浏览
0

我正在使用PySpark,我想要做的是以下内容:

一个大的Spark数据框df包含所有记录 . 我想对记录的每个子集进行并行计算,除以此df中的“id”列 . 我目前可以想到的方式如下:(我将用一个简单的例子来说明)

dicts = [
    {'id': 1,  'name': 'a',  'score':  100},
    {'id': 1,  'name': 'b',  'score':  150},
    {'id': 2,  'name': 'c',  'score':  200},
    {'id': 2,  'name': 'd',  'score':  300},
]
df = spark.createDataFrame(dicts)

from pyspark.sql.functions import (
    collect_list, 
    struct
)

# df_agg will have the following schema:   id,  a list of structs 
df_agg = df.groupBy('id').agg(
    collect_list(struct(df.columns)).alias('records')
)

但是,当我尝试做的时候

df_agg.rdd.map(my_func)

其中“my_func”是一些主要进行Spark数据帧计算的函数,我遇到了一些问题,并且不知道如何继续 . my_func在一行上运行,其中一行['records']现在保存一个结构列表 . 如何将此结构列表转换回Spark DataFrame?

toDF()不起作用 . 我尝试了spark.createDataFrame(list,schema),我甚至在原始DF使用的模式中输入,但它仍然不起作用 .

我对这些PySpark操作比较陌生,如果你能让我知道处理这种情况的正确方法,我将非常感谢你的帮助 .

谢谢!

1 回答

  • 0

    无法评论您在尝试 df_agg.rdd.map(my_func) 时遇到的错误(如果您提供 my_func 的示例,我可以去试试) . 但是,您提到无法转换为DataFrame,因此以下是该部分的解决方案:

    from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
    
    schema=StructType(
                   [StructField("id", IntegerType(), True), \
                    StructField("records", 
                        ArrayType(StructType([StructField("id", IntegerType(), True),\
                            StructField("name", StringType(), True),\
                            StructField("score", IntegerType(), True)])))
                   ])
    
    df_agg.rdd.toDF(schema=schema).show(2)
    

相关问题