首页 文章

使用PySpark删除Dataframe的嵌套列

提问于
浏览
1

我正在尝试使用pyspark在Spark数据帧中删除一些嵌套列 . 我发现这对于Scala来说似乎正在做我想要的但我不熟悉Scala并且不知道如何用Python编写它 .

https://stackoverflow.com/a/39943812/5706548

我真的很感激一些帮助 .

谢谢,

3 回答

  • 0

    我发现使用pyspark的方法是首先将嵌套列转换为json,然后使用新的嵌套模式解析转换后的json,并过滤掉不需要的列 .

    假设我有以下架构,我想从数据帧中删除 dea.b.da.e ):

    root
     |-- a: struct (nullable = true)
     |    |-- b: struct (nullable = true)
     |    |    |-- c: long (nullable = true)
     |    |    |-- d: string (nullable = true)
     |    |-- e: struct (nullable = true)
     |    |    |-- f: long (nullable = true)
     |    |    |-- g: string (nullable = true)
     |-- h: string (nullable = true)
    

    我使用了以下方法:

    • 通过排除 dea 创建新架构 . 快速执行此操作的方法是手动从 df.select("a").schema 中选择所需的字段,然后使用 StructType 从所选字段创建新架构 . 或者,您可以通过遍历架构树并排除不需要的字段以编程方式执行此操作,例如:
    def exclude_nested_field(schema, unwanted_fields, parent=""):
        new_schema = []
    
        for field in schema:
            full_field_name = field.name
            if parent:
                full_field_name = parent + "." + full_field_name
    
            if full_field_name not in unwanted_fields:
                if isinstance(field.dataType, StructType):
                    inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name)
                    new_schema.append(StructField(field.name, inner_schema))
                else:
                    new_schema.append(StructField(field.name, field.dataType))
    
        return StructType(new_schema)
    
    new_schema = exclude_nested_field(df.select("a").schema, ["a.b.d", "a.e"])
    
    • a 列转换为json: F.to_json("a")

    • 使用步骤1中找到的新模式从步骤2解析json转换的 a 列: F.from_json("a_json", new_schema)

  • 0

    Althoug我've no solution for PySpark, maybe it'更容易将其翻译成python . 考虑带有架构的数据框 df

    root
     |-- employee: struct (nullable = false)
     |    |-- name: string (nullable = false)
     |    |-- age: integer (nullable = false)
    

    那么如果你想要,例如要删除 name ,你可以这样做:

    val fieldsToKeep = df.select($"employee.*").columns
    .filter(_!="name") // the nested column you want to drop
    .map(n => "employee."+n)
    
    // overwite column with subset of fields
    df
    .withColumn("employee",struct(fieldsToKeep.head,fieldsToKeep.tail:_*))
    
  • 1

    Pyspark版本:

    def drop_col(df, col_nm, delete_col_nm):
        fields_to_keep = filter(lambda x:  x != delete_col_nm, df.select(" {}.*".format(col_nm)).columns)
        fields_to_keep = list(map(lambda x:  "{}.{}".format(col_nm, x), fields_to_keep))
        return df.withColumn(col_nm, struct(fields_to_keep))
    

相关问题