首页 文章

HashPartitioner如何运作?

提问于
浏览
66

我读了HashPartitioner的文档 . 不幸的是,除了API调用之外没有解释太多 . 我假设 HashPartitioner 基于密钥的散列对分布式集合进行分区 . 例如,如果我的数据是这样的

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

因此,分区器会将其放入不同的分区,同一个键落在同一个分区中 . 但是我不明白构造函数参数的意义

new HashPartitoner(numPartitions) //What does numPartitions do?

对于上述数据集,如果我这样做,结果会有何不同

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

那么 HashPartitioner 如何实际工作呢?

3 回答

  • 3

    好吧,让我们让你的数据集更有趣:

    val rdd = sc.parallelize(for {
        x <- 1 to 3
        y <- 1 to 2
    } yield (x, None), 8)
    

    我们有六个要素:

    rdd.count
    
    Long = 6
    

    没有分区:

    rdd.partitioner
    
    Option[org.apache.spark.Partitioner] = None
    

    和八个分区:

    rdd.partitions.length
    
    Int = 8
    

    现在让我们定义小助手来计算每个分区的元素数量:

    import org.apache.spark.rdd.RDD
    
    def countByPartition(rdd: RDD[(Int, None.type)]) = {
        rdd.mapPartitions(iter => Iterator(iter.length))
    }
    

    由于我们没有分区器,因此我们的数据集在分区之间均匀分布(Default Partitioning Scheme in Spark):

    countByPartition(rdd).collect()
    
    Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
    

    inital-distribution

    现在让我们重新分区我们的数据集:

    import org.apache.spark.HashPartitioner
    val rddOneP = rdd.partitionBy(new HashPartitioner(1))
    

    由于传递给 HashPartitioner 的参数定义了我们期望一个分区的分区数:

    rddOneP.partitions.length
    
    Int = 1
    

    由于我们只有一个分区,因此它包含所有元素:

    countByPartition(rddOneP).collect
    
    Array[Int] = Array(6)
    

    hash-partitioner-1

    请注意,shuffle之后的值的顺序是不确定的 .

    如果我们使用 HashPartitioner(2) 也一样

    val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
    

    我们将获得2个分区:

    rddTwoP.partitions.length
    
    Int = 2
    

    由于 rdd 被密钥数据分区,因此将不再均匀分布:

    countByPartition(rddTwoP).collect()
    
    Array[Int] = Array(2, 4)
    

    因为有三个键,只有两个不同的值 hashCode mod numPartitions ,这里没有任何意外:

    (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
    
    scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
    

    只是为了确认以上内容:

    rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
    
    Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
    

    hash-partitioner-2

    最后用 HashPartitioner(7) 我们得到七个分区,三个非空,每个分区有2个元素:

    val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
    rddSevenP.partitions.length
    
    Int = 7
    
    countByPartition(rddTenP).collect()
    
    Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
    

    hash-partitioner-7

    摘要和注释

    • HashPartitioner 采用一个定义分区数的参数
      使用 hash 键将
    • 值分配给分区 . hash 函数可能因语言而异(Scala RDD可能使用 hashCodeDataSets 使用MurmurHash 3,PySpark,portable_hash) .

    在这种简单的情况下,key是一个小整数,你可以假设 hash 是一个标识( i = hash(i) ) .

    Scala API使用nonNegativeMod来确定基于计算哈希的分区,

    Java数组的hashCodes基于数组的标识而不是其内容,因此尝试使用HashPartitioner对RDD [Array []]或RDD [(Array [],_)]进行分区会产生意外或不正确的结果 .

  • 5

    RDD 是分布式的,这意味着它被分成若干个部分 . 每个分区都可能位于不同的计算机上 . 带有参数 numPartitions 的哈希分区器选择以下列方式放置对 (key, value) 的分区:

    • 准确创建 numPartitions 分区 .

    • (key, value) 放在分区中,编号为 Hash(key) % numPartitions

  • 122

    HashPartitioner.getPartition 方法将键作为其参数,并返回键所属的分区的索引 . 分区程序必须知道有效索引是什么,因此它返回正确范围内的数字 . 分区数通过 numPartitions 构造函数参数指定 .

    实现返回大致 key.hashCode() % numPartitions . 有关详细信息,请参阅Partitioner.scala .

相关问题