Commmunity!
请帮助我了解如何通过Spark获得更好的压缩比?
让我来描述一下案例:
-
我有数据集,让我们在HDFS上调用它 product ,这是使用编解码器 snappy 使用Sqoop ImportTool as-parquet-file导入的 . 作为导入的结果,我有100个文件,总共46 GB du,文件大小不同(最小11MB,最大1.5GB,平均~500MB) . 记录总数略高于8亿,有84列
-
我正在使用 snappy 对Spark进行简单的读取/重新分区/写入,结果我得到了:
~ 100 GB 输出大小与相同的文件数,相同的编解码器,相同的计数和相同的列 .
代码段:
val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
- 使用镶木地板工具我已经查看了摄取和处理过的随机文件,它们如下所示:
摄取:
creator: parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber})
extra: parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"
and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1
row group 1: RC:3640100 TS:36454739 OFFSET:4
AVAILABLE: INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]
处理:
creator: parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber})
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"
AVAILABLE: OPTIONAL INT64 R:0 D:1
...
row group 1: RC:6660100 TS:243047789 OFFSET:4
AVAILABLE: INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]
另一方面,没有重新分区或使用合并 - 大小仍然接近摄取数据大小 .
-
继续前进,我做了以下事情:
-
读取数据集并将其写回
productDF
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "none")
.parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
- 读取数据集,重新分区并将其写回
productDF
.repartition(500)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "none")
.parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")
结果: 80 GB 没有和 283 GB 重新分配相同的输出文件数
80GB实木复合地板示例:
AVAILABLE: INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]
283 GB镶木地板的例子:
AVAILABLE: INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]
看来,即使没有未压缩的数据,镶木地板(带编码?)也会大大减少数据的大小 . 怎么样 ? :)
我尝试读取未压缩的80GB,重新分区并回写 - 我已经获得了283 GB
-
对我来说,第一个问题是为什么我在火花重新分区/洗牌之后会变大?
-
第二个是如何有效地改组火花中的数据以获得镶木地板编码/压缩(如果有的话)?
一般来说,即使我没有改变任何东西,我也不希望我的数据大小在火花处理后增长 .
另外,我没有发现,是否有适合snappy的可配置压缩率,例如: -1 ... -9?据我所知,gzip有这个,但是在Spark / Parquet编写器中控制这个速率的方法是什么?
感谢任何帮助!
谢谢!