首页 文章

处理Spark中的大型gzip压缩文件

提问于
浏览
5

我有一个来自s3的大型(大约85 GB压缩)gzip压缩文件,我正在尝试使用AWS EMR上的Spark处理(现在有一个m4.xlarge主实例和两个m4.10xlarge核心实例,每个实例都有一个100 GB的EBS卷) . 我知道gzip是一种不可拆分的文件格式,并且应该重新对压缩文件进行重新分区,因为Spark最初给出了一个带有一个分区的RDD . 但是,做完之后

scala> val raw = spark.read.format("com.databricks.spark.csv").
     | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
     | load("s3://path/to/file.gz").
     | repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()

并且看一下Spark应用程序UI,我仍然只看到一个活动的执行程序(其他14个已经死了)有一个任务,并且作业永远不会完成(或者至少我没有等待足够长的时间) .

  • 这里发生了什么?有人可以帮我理解Spark在这个例子中是如何工作的吗?

  • 我应该使用其他群集配置吗?

  • 不幸的是,我无法控制压缩模式,但有没有其他方法可以处理这样的文件?

3 回答

  • 0

    如果文件格式不可拆分,则无法避免在一个核心上完整地读取文件 . 为了并行化工作,您必须知道如何将工作块分配给不同的计算机 . 在gzip的情况下,假设您将其分成128M块 . 第n个块依赖于第n-1个块的位置信息来知道如何解压缩,这取决于n-2-nd块,依此类推到第一个块 .

    如果要并行化,则需要使此文件可拆分 . 一种方法是解压缩并解压缩它,或者你可以解压缩它,将它拆分成几个文件(每个并行任务一个文件),然后gzip每个文件 .

  • 6

    我遇到了这个问题,这是解决方案 .

    解决此问题的最佳方法是在我们的Spark批处理运行之前解压缩.gz文件 . 然后使用这个解压缩文件,之后我们可以使用Spark并行性 .

    用于解压缩.gz文件的代码 .

    import gzip
    import shutil
    with open('file.txt.gz', 'rb') as f_in, gzip.open('file.txt', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)
    
  • 0

    Spark可以并行读取单个gzip文件 .

    您可以做的最好的事情是将它们分解为gzip压缩块 .

    但是,Spark在阅读gzip文件方面确实很慢 . 你可以这样做来加快它:

    file_names_rdd = sc.parallelize(list_of_files, 100)
    lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())
    

    通过Python阅读本机Spark gzip阅读器的速度快了两倍 .

相关问题