首页 文章

使用Spark将CSV转换为镶木地板,保留分区

提问于
浏览
2

我正在尝试使用Spark将一堆csv文件转换为镶木地板,有趣的情况是输入的csv文件已经被目录“分区”了 . 所有输入文件都具有相同的列集 . 输入文件结构如下所示:

/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

我想用Spark读取这些文件,并将它们的数据写入hdfs中的镶木桌,保留分区(按输入目录分区),例如每个分区有一个输出文件 . 输出文件结构应如下所示:

hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet

到目前为止,我发现的最佳解决方案是在输入目录之间循环,在数据帧中加载csv文件,并在镶木桌中的目标分区中写入数据帧 . 但这不高效,因为我想要每个分区一个输出文件,写入hdfs是阻止循环的单个任务 . 我想知道如何通过最大程度的并行性来实现这一目标(并且不需要对群集中的数据进行混洗) .

谢谢 !

2 回答

  • 0

    重命名输入目录,将 dirX 更改为 dir=dirX . 然后执行:

    spark.read.csv('/path/').coalesce(1).write.partitionBy('dir').parquet('output')
    

    如果无法重命名目录,则可以使用Hive Metastore . 为每个目录创建外部表和一个分区 . 然后加载此表并使用上面的模式重写 .

  • 0

    到目前为止我找到的最佳解决方案(没有洗牌和输入dirs的线程数量):

    • 创建一个输入目录的rdd,其中包含与输入目录一样多的分区

    • 将其转换为输入文件的rdd(通过dirs保留分区)

    • 使用自定义csv解析器对其进行平面映射

    • 将rdd转换为dataframe

    • 将数据帧写入由dirs分区的镶木桌

    它需要编写自己的解析器 . 我找不到使用sc.textfile或databricks csv解析器来保留分区的解决方案 .

相关问题