首页 文章

SPARK中的自定义分区程序(pyspark)

提问于
浏览
1

我正在尝试使用PySpark在spark作业中创建一个自定义分区器,比方说,我有一些整数列表 [10,20,30,40,50,10,20,35] . 现在我想要一个场景,我有两个分区,如 p1p2 . p1 包含所有列表元素<30和 p2 包含30以上的所有元素 .

elements = sc.parallelize([10,20,30,40,50,10,20,35]).map(lambda x : (float(x)/10,x)).partitionBy(2).glom().collect()

上面的代码根据我传递的任意键的 hash 对列表进行分区 . 无论如何根据特定场景划分列表?就像 Value 小于x或类似的东西?

2 回答

  • 2

    首先使用密钥将您的条目映射为((10,10),(20,20))以获得组合键值对 . 然后使用自定义分区程序,它将根据分区元素的键值来决定 .

    尝试在python中实现这样的类:

    class ExactPartitioner[V](partitions: Int, elements: Int) extends Partitioner {
      def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[Int]
        //0 and 1 are partition number
        return ( k < 30 )? 0 : 1
      }
    }
    
  • 2

    撇开FaigB的答案,如果值高于阈值,则需要分区,而不是值本身 . 这是它在python中的外观

    rdd = sc.parallelize([10,20,30,40,50,10,20,35]).map(lambda x : (float(x)/10, float(x)/10))
    elements = rdd.partitionBy(2,lambda x: int(x > 3)).map(lambda x: x[0]).glom().collect()
    elements
    

    结果如何

    [[1.0, 2.0, 3.0, 1.0, 2.0], [4.0, 5.0, 3.5]]
    

相关问题