Commmunity!

请帮助我了解如何通过Spark获得更好的压缩比?

让我来描述一下案例:

  • 我有数据集,让我们在HDFS上调用它 product ,这是使用编解码器 snappy 使用Sqoop ImportTool as-parquet-file导入的 . 作为导入的结果,我有100个文件,总共46 GB du,文件大小不同(最小11MB,最大1.5GB,平均~500MB) . 记录总数略高于8亿,有84列

  • 我正在使用 snappy 对Spark进行简单的读取/重新分区/写入,结果我得到了:

~ 100 GB 输出大小与相同的文件数,相同的编解码器,相同的计数和相同的列 .

代码段:

val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
  • 使用镶木地板工具我已经查看了摄取和处理过的随机文件,它们如下所示:

摄取:

creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1

row group 1:                    RC:3640100 TS:36454739 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]

处理:

creator:                        parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber}) 
extra:                          org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"

AVAILABLE:                      OPTIONAL INT64 R:0 D:1
...

row group 1:                    RC:6660100 TS:243047789 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]

另一方面,没有重新分区或使用合并 - 大小仍然接近摄取数据大小 .

  • 继续前进,我做了以下事情:

  • 读取数据集并将其写回

productDF
  .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
  .option("compression", "none")
  .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
  • 读取数据集,重新分区并将其写回
productDF
  .repartition(500)
  .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
  .option("compression", "none")
  .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")

结果: 80 GB 没有和 283 GB 重新分配相同的输出文件数

80GB实木复合地板示例:

AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]

283 GB镶木地板的例子:

AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]

看来,即使没有未压缩的数据,镶木地板(带编码?)也会大大减少数据的大小 . 怎么样 ? :)

我尝试读取未压缩的80GB,重新分区并回写 - 我已经获得了283 GB

  • 对我来说,第一个问题是为什么我在火花重新分区/洗牌之后会变大?

  • 第二个是如何有效地改组火花中的数据以获得镶木地板编码/压缩(如果有的话)?

一般来说,即使我没有改变任何东西,我也不希望我的数据大小在火花处理后增长 .

另外,我没有发现,是否有适合snappy的可配置压缩率,例如: -1 ... -9?据我所知,gzip有这个,但是在Spark / Parquet编写器中控制这个速率的方法是什么?

感谢任何帮助!

谢谢!