首页 文章

如何更改DataFrame的架构(以修复某些嵌套字段的名称)?

提问于
浏览
2

我有一个问题,当我们将一个Json文件加载到Spark中时,将其存储为Parquet,然后尝试从Impala访问Parquet文件; Impala抱怨列的名称,因为它们包含在SQL中非法的字符 .

JSON文件的一个“功能”是它们没有预定义的架构 . 我希望Spark创建模式,然后我必须修改具有非法字符的字段名称 .

我的第一个想法是在DataFrame中的字段名称上使用 withColumnRenamed 但这只适用于我认为的顶级字段,所以我不能使用它,因为Json包含嵌套数据 .

所以我创建了以下代码来重新创建DataFrames模式,以递归方式遍历结构 . 然后我使用新架构重新创建DataFrame .

(代码更新了Jacek建议使用Scala复制构造函数的改进 . )

def replaceIllegal(s: String): String = s.replace("-", "_").replace("&", "_").replace("\"", "_").replace("[", "_").replace("[", "_")
def removeIllegalCharsInColumnNames(schema: StructType): StructType = {
  StructType(schema.fields.map { field =>
    field.dataType match {
      case struct: StructType =>
        field.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(struct))
      case _ =>
        field.copy(name = replaceIllegal(field.name))
    }
  })
}

sparkSession.createDataFrame(df.rdd, removeIllegalCharsInColumnNames(df.schema))

这很有效 . 但有没有更好/更简单的方法来实现我想做的事情?

是否有更好的方法来替换DataFrame上的现有架构?以下代码不起作用:

df.select($"*".cast(removeIllegalCharsInColumnNames(df.schema)))

它给出了这个错误:

org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'cast'

1 回答

  • 1

    我认为最好的办法是将数据集(在保存为镶木地板文件之前)转换为RDD,并使用自定义模式根据需要描述结构 .

    val targetSchema: StructType = ...
    val fromJson: DataFrame = ...
    val targetDataset = spark.createDataFrame(fromJson.rdd, targetSchema)
    

    请参阅SparkSession.createDataFrame中的示例作为参考,但是当您要从数据集创建RDD时,它会直接使用RDD .

    val schema =
      StructType(
        StructField("name", StringType, false) ::
        StructField("age", IntegerType, true) :: Nil)
    
    val people =
      sc.textFile("examples/src/main/resources/people.txt").map(
        _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
    val dataFrame = sparkSession.createDataFrame(people, schema)
    dataFrame.printSchema
    // root
    // |-- name: string (nullable = false)
    // |-- age: integer (nullable = true)
    

    但正如您在评论中提到的那样(我后来合并到您的问题中):

    JSON文件没有预定义的架构 .

    话虽如此,我认为您的解决方案是正确的 . Spark没有提供开箱即用的类似功能,我认为它更多的是开发一个自定义的Scala代码,它将遍历 StructType / StructField 树并改变不正确的内容 .

    我建议你在代码中改变的是使用 copy 构造函数(Scala案例类的一个特性 - 请参阅A Scala case class ‘copy’ method example),它只会更改不正确的名称,而其他属性不会受到影响 .

    使用 copy 构造函数将(大致)对应于以下代码:

    // was
    // case s: StructType =>
    //    StructField(replaceIllegal(field.name), removeIllegalCharsInColumnNames(s), field.nullable, field.metadata)
    s.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(s))
    

    功能语言(通常)和Scala(特别是)中有一些设计模式可以处理深层嵌套结构操作,但这可能太多了(而且我对分享它犹豫不决) .

    因此,我认为问题在于其当前的“形状”更多关于如何将树操作为数据结构,而不一定是Spark模式 .

相关问题