我有一个巨大的GZip文件列表,需要转换为Parquet . 由于GZip的压缩特性,这不能为一个文件并行化 .
但是,由于我有很多,是否有一种相对简单的方法让每个节点都能完成部分文件?这些文件在HDFS上 . 我假设我不能使用RDD基础结构来编写Parquet文件,因为这一切都是在驱动程序上完成的,而不是在节点本身上完成的 .
我可以并行化文件名列表,编写一个处理Parquets本地的函数并将它们保存回HDFS . 我不知道该怎么做 . 我觉得我错过了一些明显的东西,谢谢!
这被标记为重复的问题,但事实并非如此 . 我完全清楚Spark能够在不担心压缩的情况下将它们作为RDD读取,我的问题更多的是如何并行化将这些文件转换为结构化的Parquet文件 .
如果我知道如何在没有Spark本身的情况下与Parquet文件交互,我可以这样做:
def convert_gzip_to_parquet(file_from, file_to):
gzipped_csv = read_gzip_file(file_from)
write_csv_to_parquet_on_hdfs(file_to)
# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))
这将允许我将其并行化,但是我不知道如何从本地环境与HDFS和Parquet进行交互 . 我想知道:
1)怎么做
要么..
2)如何使用PySpark以不同的方式并行化此过程
1 回答
我建议采用以下两种方法中的一种(实际上我发现第一种方法在性能方面给出了更好的结果) .
将每个Zip文件写入单独的Parquet文件
在这里你可以使用
pyarrow
将一个Parquet文件写入HDFS:有两种方法可以获取pyarrow.Table对象:
要么从pandas DataFrame中获取它(在这种情况下你也可以使用pandas的read_csv()函数):
pyarrow_table = pyarrow.Table.from_pandas(pandas_df)
或使用pyarrow.Table.from_arrays手动构建它
要使pyarrow使用HDFS,需要正确设置多个环境变量,请参阅here
将所有Zip-Files中的行连接到一个Parquet-File中
如果您事先知道数据的架构,则将架构对象传递给
create_DataFrame
将加速DataFrame的创建 .