首页 文章

并行化GZip文件处理Spark

提问于
浏览
3

我有一个巨大的GZip文件列表,需要转换为Parquet . 由于GZip的压缩特性,这不能为一个文件并行化 .

但是,由于我有很多,是否有一种相对简单的方法让每个节点都能完成部分文件?这些文件在HDFS上 . 我假设我不能使用RDD基础结构来编写Parquet文件,因为这一切都是在驱动程序上完成的,而不是在节点本身上完成的 .

我可以并行化文件名列表,编写一个处理Parquets本地的函数并将它们保存回HDFS . 我不知道该怎么做 . 我觉得我错过了一些明显的东西,谢谢!

这被标记为重复的问题,但事实并非如此 . 我完全清楚Spark能够在不担心压缩的情况下将它们作为RDD读取,我的问题更多的是如何并行化将这些文件转换为结构化的Parquet文件 .

如果我知道如何在没有Spark本身的情况下与Parquet文件交互,我可以这样做:

def convert_gzip_to_parquet(file_from, file_to):
    gzipped_csv = read_gzip_file(file_from)
    write_csv_to_parquet_on_hdfs(file_to)

# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))

这将允许我将其并行化,但是我不知道如何从本地环境与HDFS和Parquet进行交互 . 我想知道:

1)怎么做

要么..

2)如何使用PySpark以不同的方式并行化此过程

1 回答

  • 0

    我建议采用以下两种方法中的一种(实际上我发现第一种方法在性能方面给出了更好的结果) .

    将每个Zip文件写入单独的Parquet文件

    在这里你可以使用 pyarrow 将一个Parquet文件写入HDFS:

    def convert_gzip_to_parquet(file_from, file_to):
        gzipped_csv = read_gzip_file(file_from)
        pyarrow_table = to_pyarrow_table(gzipped_csv)
        hdfs_client = pyarrow.HdfsClient()
        with hdfs_client.open(file_to, "wb") as f:
            pyarrow.parquet.write_table(pyarrow_table, f)
    
    # Filename RDD contains tuples with file_from and file_to
    filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))
    

    有两种方法可以获取pyarrow.Table对象:

    • 要么从pandas DataFrame中获取它(在这种情况下你也可以使用pandas的read_csv()函数): pyarrow_table = pyarrow.Table.from_pandas(pandas_df)

    • 或使用pyarrow.Table.from_arrays手动构建它

    要使pyarrow使用HDFS,需要正确设置多个环境变量,请参阅here

    将所有Zip-Files中的行连接到一个Parquet-File中

    def get_rows_from_gzip(file_from):
        rows = read_gzip_file(file_from)
        return rows
    
    # read the rows of each zip file into a Row object
    rows_rdd = filenameRDD.map(lambda x: get_rows_from_gzip(x[0]))
    
    # flatten list of lists
    rows_rdd = rows_rdd.flatMap(lambda x: x)
    
    # convert to DataFrame and write to Parquet
    df = spark_session.create_DataFrame(rows_rdd)
    df.write.parquet(file_to)
    

    如果您事先知道数据的架构,则将架构对象传递给 create_DataFrame 将加速DataFrame的创建 .

相关问题