首页 文章

如何在 Spark SQL 中控制分区大小

提问于
浏览
19

我需要使用 Spark SQL HiveContext从 Hive 表中加载数据并加载到 HDFS 中。默认情况下,SQL 输出中的DataFrame具有 2 个分区。为了获得更多的并行性,我需要在 SQL 中增加分区。 HiveContex t 中没有重载方法来获取分区数参数。

RDD 的重新分区会导致改组并导致更多的处理时间。


val 结果= sqlContext.sql(“从 bt_st_ent 中选择*”)

 
Has the log output of:
 
```java
Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes)
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes)

我想知道有什么方法可以增加 SQL 输出的分区大小。

3 回答

  • 9

    火花<2.0

    您可以使用 Hadoop 配置选项:

    • mapred.min.split.size .

    • mapred.max.split.size

    以及 HDFS 块大小,以控制基于文件系统格式*的分区大小。

    val minSplit: Int = ???
    val maxSplit: Int = ???
    
    sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit)
    sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit)
    

    火花 2.0

    您可以使用spark.sql.files.maxPartitionBytes配置:

    spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit)
    

    在这两种情况下,特定的数据源 API 可能都未使用这些值,因此您应始终检查所使用格式的文档/实现详细信息。


    • 其他输入格式可以使用不同的设置。例如看

    此外,从RDDs创建的Datasets将继承其父级的分区布局。

    类似地,存储桶表将使用元存储中定义的存储桶布局,其中存储桶和Dataset分区之间具有 1:1 关系。

  • 4

    一个非常常见且痛苦的问题。您应该寻找一个将数据分布在统一分区中的键。您可以使用DISTRIBUTE BYCLUSTER BY运算符告诉 spark 将分区中的行分组。这将在查询本身上产生一些开销。但是会导致分区大小均匀。 深度感对此有很好的教程。

  • 1

    如果您的 SQL 执行改组(例如,它具有联接或某种分组依据),则可以通过设置'spark.sql.shuffle.partitions'属性来设置分区数

    sqlContext.setConf( "spark.sql.shuffle.partitions", 64)
    

    按照 Fokko 的建议,可以使用随机变量进行聚类。

    val result = sqlContext.sql("""
       select * from (
         select *,random(64) as rand_part from bt_st_ent
       ) cluster by rand_part""")
    

相关问题