首页 文章

为什么我的Spark DataFrame比RDD慢得多?

提问于
浏览
4

我有一个非常简单的Spark DataFrame,当运行DataFrame groupby时,性能非常糟糕 - 比(在我脑中)等效的RDD reduceByKey慢约8倍...

我的缓存DF只有两列,客户和名称只有5万行:

== Physical Plan ==
InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelation [customer#2454,name#2456], true, 10000, StorageLevel(true, true, false, true, 1), Scan ParquetRelation[customer#2454,name#2456] InputPaths: hdfs://nameservice1/tmp/v2_selected_parquet/test_parquet2, None

当我运行以下两个片段时,我期望性能相似,不是rdd版本在10s运行而DF版本在85s运行...

rawtempDF2.rdd.map(lambda x: (x['name'], 1)).reduceByKey(lambda x,y: x+y).collect()

rawtempDF2.groupby('name').count().collect()

我错过了一些非常基本的东西吗? FWIW,RDD版本运行54个阶段,DF版本为227:/

编辑:我正在使用Spark 1.6.1和Python 3.4.2 . 编辑2:此外,源镶木地板是分区客户/日/名称 - 目前27客户,1天,c . 45个名字 .

1 回答

  • 8

    这两个数字似乎都相对较高,并且不清楚如何创建 DataFrame 或测量时间,但一般来说,这样的差异可以通过与分区数量相比较少的记录来解释 .

    spark.sql.shuffle.partitions 的默认值为200,表示您获得的任务数 . 使用50K记录时,启动任务的开销将高于从并行执行中获得的加速 . 让我们用一个简单的例子来说明 . 首先让我们创建一个示例数据:

    import string
    import random
    
    random.seed(323)
    
    def random_string():
      n = random.randint(3, 6)
      return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)), )
    
    df = (sc
        .parallelize([random_string() for _ in range(50000)], 8).toDF(["name"])
        .cache())
    

    并根据 shuffle.partitions 的数量来衡量时间:

    sqlContext.setConf("spark.sql.shuffle.partitions", "1")
    %timeit -n 10  df.groupby('name').count().collect()
    ## 10 loops, best of 3: 504 ms per loop
    
    sqlContext.setConf("spark.sql.shuffle.partitions", "1")
    %timeit -n 10  df.groupby('name').count().collect()
    ## 10 loops, best of 3: 451 ms per loop
    
    sqlContext.setConf("spark.sql.shuffle.partitions", "100")
    %timeit -n 10  df.groupby('name').count().collect()
    ## 10 loops, best of 3: 624 ms per loop
    
    sqlContext.setConf("spark.sql.shuffle.partitions", "200")
    %timeit -n 10  df.groupby('name').count().collect()
    ## 10 loops, best of 3: 778 ms per loop
    
    sqlContext.setConf("spark.sql.shuffle.partitions", "1000")
    %timeit -n 10  df.groupby('name').count().collect()
    ## 10 loops, best of 3: 1.75 s per loop
    

    虽然这些值与您声称的值不可比,并且此数据已在本地模式下收集,但您可以看到相对清晰的模式 . 这同样适用于RDD:

    from operator import add
    
    %timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect()
    ## 10 loops, best of 3: 414 ms per loop
    
    %timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect()
    ## 10 loops, best of 3: 439 ms per loop
    
    %timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect()
    ## 10 loops, best of 3: 1.3 s per loop
    
    %timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect()
    ## 10 loops, best of 3: 8.41 s per loop
    

    在适当的分布式环境中,由于网络IO的成本,这将更高 .

    仅供比较,让我们检查在没有Spark的情况下在本地执行此任务需要多长时间

    from collections import Counter
    
    data = df.rdd.flatMap(lambda x: x).collect()
    
    %timeit -n 10 Counter(data)
    ## 10 loops, best of 3: 9.9 ms per loop
    

    您还应该查看数据位置 . 根据您使用的存储和配置,即使使用这样的小输入,这也会给您的作业增加额外的延迟 .

相关问题