我有一个问题,当我们将一个Json文件加载到Spark中时,将其存储为Parquet,然后尝试从Impala访问Parquet文件; Impala抱怨列的名称,因为它们包含在SQL中非法的字符 .
JSON文件的一个“功能”是它们没有预定义的架构 . 我希望Spark创建模式,然后我必须修改具有非法字符的字段名称 .
我的第一个想法是在DataFrame中的字段名称上使用 withColumnRenamed
但这只适用于我认为的顶级字段,所以我不能使用它,因为Json包含嵌套数据 .
所以我创建了以下代码来重新创建DataFrames模式,以递归方式遍历结构 . 然后我使用新架构重新创建DataFrame .
(代码更新了Jacek建议使用Scala复制构造函数的改进 . )
def replaceIllegal(s: String): String = s.replace("-", "_").replace("&", "_").replace("\"", "_").replace("[", "_").replace("[", "_")
def removeIllegalCharsInColumnNames(schema: StructType): StructType = {
StructType(schema.fields.map { field =>
field.dataType match {
case struct: StructType =>
field.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(struct))
case _ =>
field.copy(name = replaceIllegal(field.name))
}
})
}
sparkSession.createDataFrame(df.rdd, removeIllegalCharsInColumnNames(df.schema))
这很有效 . 但有没有更好/更简单的方法来实现我想做的事情?
是否有更好的方法来替换DataFrame上的现有架构?以下代码不起作用:
df.select($"*".cast(removeIllegalCharsInColumnNames(df.schema)))
它给出了这个错误:
org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'cast'
1 回答
我认为最好的办法是将数据集(在保存为镶木地板文件之前)转换为RDD,并使用自定义模式根据需要描述结构 .
请参阅SparkSession.createDataFrame中的示例作为参考,但是当您要从数据集创建RDD时,它会直接使用RDD .
但正如您在评论中提到的那样(我后来合并到您的问题中):
话虽如此,我认为您的解决方案是正确的 . Spark没有提供开箱即用的类似功能,我认为它更多的是开发一个自定义的Scala代码,它将遍历
StructType
/StructField
树并改变不正确的内容 .我建议你在代码中改变的是使用
copy
构造函数(Scala案例类的一个特性 - 请参阅A Scala case class ‘copy’ method example),它只会更改不正确的名称,而其他属性不会受到影响 .使用
copy
构造函数将(大致)对应于以下代码:功能语言(通常)和Scala(特别是)中有一些设计模式可以处理深层嵌套结构操作,但这可能太多了(而且我对分享它犹豫不决) .
因此,我认为问题在于其当前的“形状”更多关于如何将树操作为数据结构,而不一定是Spark模式 .