我试图读取多个输入路径,并根据路径中的日期向数据框添加两列 . 实际上,文件存储为使用hive按这些日期分区的orc,因此它们具有类似 s3n://bucket_name/folder_name/partition1=value1/partition2=value2
的结构,其中partition2 = mg_load_date . 所以在这里我试图从多个路径获取多个目录,并且基于分区,我必须为每个spark数据帧创建两列,即mg_load_date和event_date . 我正在阅读这些作为输入并在我添加这两列之后将它们组合起来,分别找到每个文件的日期 .
有没有其他方法,因为我对每个文件有很多读取,一次读取所有文件,同时为其特定行添加两列 . 或者我可以快速进行读取操作的任何其他方式,因为我有很多读取 . 我想读这样的所有文件 sqlContext.read.format('orc').load(inputpaths)
比单独阅读它们然后合并它们要快 .
任何帮助表示赞赏 .
dfs = []
for i in input_paths:
df = sqlContext.read.format('orc').load(i)
date = re.search('mg_load_date=([^/]*)/$', i).group(1)
df = df.withColumn('event_date',F.lit(date)).withColumn('mg_load_date',F.lit(date))
dfs+=[df]
df = reduce(DataFrame.unionAll,dfs)
2 回答
正如@ user8371915所说,您应该从根路径加载数据,而不是传递子目录列表:
然后,您将可以访问分区列
partition1
和partition2
.如果由于某种原因无法从根路径加载,则可以尝试使用
pyspark.sql.functions
input_file_name
获取数据帧每行的文件名 .Spark 2.2.0
使用orc格式从多个文件夹中读取 .
ref:https://issues.apache.org/jira/browse/SPARK-12334