首页 文章

从多个输入路径读取数据帧并同时添加列

提问于
浏览
1

我试图读取多个输入路径,并根据路径中的日期向数据框添加两列 . 实际上,文件存储为使用hive按这些日期分区的orc,因此它们具有类似 s3n://bucket_name/folder_name/partition1=value1/partition2=value2 的结构,其中partition2 = mg_load_date . 所以在这里我试图从多个路径获取多个目录,并且基于分区,我必须为每个spark数据帧创建两列,即mg_load_date和event_date . 我正在阅读这些作为输入并在我添加这两列之后将它们组合起来,分别找到每个文件的日期 .

有没有其他方法,因为我对每个文件有很多读取,一次读取所有文件,同时为其特定行添加两列 . 或者我可以快速进行读取操作的任何其他方式,因为我有很多读取 . 我想读这样的所有文件 sqlContext.read.format('orc').load(inputpaths) 比单独阅读它们然后合并它们要快 .

任何帮助表示赞赏 .

dfs = []
for i in input_paths:
    df = sqlContext.read.format('orc').load(i)  
    date = re.search('mg_load_date=([^/]*)/$', i).group(1)
    df = df.withColumn('event_date',F.lit(date)).withColumn('mg_load_date',F.lit(date))
    dfs+=[df]
df = reduce(DataFrame.unionAll,dfs)

2 回答

  • 0

    正如@ user8371915所说,您应该从根路径加载数据,而不是传递子目录列表:

    sqlContext.read.format('orc').load("s3n://bucket_name/folder_name/")
    

    然后,您将可以访问分区列 partition1partition2 .

    如果由于某种原因无法从根路径加载,则可以尝试使用 pyspark.sql.functions input_file_name 获取数据帧每行的文件名 .

  • 0

    Spark 2.2.0

    使用orc格式从多个文件夹中读取 .

    df=spark.read.orc([path1,path2])
    

    ref:https://issues.apache.org/jira/browse/SPARK-12334

相关问题