所以我只有1个镶木地板文件我喜欢用100个分区处理它 . 我已经尝试将 spark.default.parallelism
设置为100,我们也尝试将镶木地板的压缩更改为无(来自gzip) . 无论我们做什么,火花作业的第一阶段只有一个分区(一旦发生洗牌,它就被重新划分为100,此后显然事情要快得多) .
现在根据一些消息来源(如下),镶木地板应该是可拆分的(即使使用gzip!),所以我非常困惑,并且会喜欢一些建议 .
我正在使用spark 1.0.0,显然 spark.sql.shuffle.partitions
的默认值是200,所以它可以't be that. In fact all the defaults for parallelism are much more than 1, so I don' t了解发生了什么 .
5 回答
您应该使用较小的块大小编写镶木地板文件 . 默认值为每块128Mb,但可以通过在writer中设置
parquet.block.size
配置进行配置 .The source of ParquetOuputFormat is here,如果你想深入了解细节 .
块大小是你可以从一个逻辑上可读的镶木地板文件中读出的最小数据量(因为镶木地板是柱状的,你不能只是按行分割或像这样简单的东西),所以你不能有更多的阅读线程比输入块 .
也许你的镶木地板文件只需要一个HDFS块 . 创建一个包含许多HDFS块并加载它的大型镶木地板文件
您将看到与HDFS块相同数量的分区 . 这对我来说很好(spark-1.1.0)
您已经提到要在写入镶木地板时控制分布 . 当您从RDD创建镶木地板时,会保留RDD的分区 . 因此,如果您创建RDD并指定100个分区,并且从带有镶木地板格式的数据框中,那么它将写入100个单独的镶木地板文件到fs . 对于读取,您可以指定
spark.sql.shuffle.partitions
参数 .要实现这一点,您应该使用
SparkContext
来设置Hadoop配置(sc.hadoopConfiguration
)属性mapreduce.input.fileinputformat.split.maxsize
.通过将此属性设置为低于hdfs.blockSize的值,您将获得与拆分数量一样多的分区 .
例如:
当
hdfs.blockSize
= 134217728(128MB)时,并读取一个文件,其中只包含一个完整的块,
和
mapreduce.input.fileinputformat.split.maxsize
= 67108864(64MB)然后会有两个分区,这些分区将被读入 .
这样做的新方法(Spark 2.x)正在设置
spark.sql.files.maxPartitionBytes
来源:https://issues.apache.org/jira/browse/SPARK-17998(官方文档还不正确,错过了.sql)
根据我的经验,Hadoop设置不再有效 .