首页 文章

如何确定PySpark数据帧分区的“首选位置”?

提问于
浏览
0

我试图理解 coalesce 如何确定如何将初始分区加入到最终问题中,显然"preferred location"与它有关 .

根据this question,Scala Spark有一个函数 preferredLocations(split: Partition) 可以识别这个 . 但我并不熟悉Spark的Scala方面 . 有没有办法在PySpark级别确定给定行或分区ID的首选位置?

1 回答

  • 1

    是的,这在理论上是可行的 . 强制某种形式的偏好的示例数据(可能有一个更简单的例子):

    rdd1 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
    rdd2 = sc.range(10).map(lambda x: (x % 4, None)).partitionBy(8)
    
    # Force caching so downstream plan has preferences
    rdd1.cache().count()
    
    rdd3 = rdd1.union(rdd2)
    

    现在您可以定义一个帮助器:

    from pyspark import SparkContext
    
    def prefered_locations(rdd):
        def to_py_generator(xs):
            """Convert Scala List to Python generator"""
            j_iter = xs.iterator()
            while j_iter.hasNext():
                yield j_iter.next()
    
        # Get JVM
        jvm =  SparkContext._active_spark_context._jvm
        # Get Scala RDD
        srdd = jvm.org.apache.spark.api.java.JavaRDD.toRDD(rdd._jrdd)
        # Get partitions
        partitions = srdd.partitions()
        return {
            p.index(): list(to_py_generator(srdd.preferredLocations(p)))
            for p in partitions
        }
    

    应用:

    prefered_locations(rdd3)
    
    # {0: ['...'],
    #  1: ['...'],
    #  2: ['...'],
    #  3: ['...'],
    #  4: [],
    #  5: [],
    #  6: [],
    #  7: []}
    

相关问题