首页 文章

Apache Spark:为什么reduceByKey转换会执行DAG?

提问于
浏览
4

我面临一个奇怪的问题 . 据我所知,Spark中的操作DAG仅在执行操作时执行 . 但是,我可以看到reduceByKey()opertation(是一个转换)开始执行DAG .

重现步骤 . 尝试下面的代码

SparkConf conf =new SparkConf().setMaster("local").setAppName("Test");
JavaSparkContext context=new JavaSparkContext(conf);

JavaRDD<String> textFile = context.textFile("any non-existing path"); // This path should not exist

JavaRDD<String> flatMap = textFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1));

注意:文件的路径不应该是任何现有路径 . 换句话说,文件不应该存在 .

如果执行此代码,则没有按预期发生 . 但是,如果将以下行添加到程序并执行

mapToPair.reduceByKey((x, y) -> x + y);

它给出了以下例外:

Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:

这意味着它已经开始执行DAG . 由于reduceByKey()是一个转换,因此在执行诸如collect()或take()之类的操作之前不应该这样 .

Spark版本:2.0.0 . 请提供您的建议 .

1 回答

  • 2

    这是因为,实际上不是DAG被执行(如:它的整个物化) .

    会发生什么是reduceByKey需要分区程序才能工作 . 如果您不提供一个,Spark会根据约定和默认值创建一个 . "default partiionner"作为代码中的以下注释:

    /**
    * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
    *
    * If any of the RDDs already has a partitioner, choose that one.
    *
    * Otherwise, we use a default HashPartitioner. For the number of partitions, if
    * spark.default.parallelism is set, then we'll use the value from SparkContext
    * defaultParallelism, otherwise we'll use the max number of upstream partitions.
    *
    * Unless spark.default.parallelism is set, the number of partitions will be the
    * same as the number of partitions in the largest upstream RDD, as this should
    * be least likely to cause out-of-memory errors.
    *
    * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
    */
    

    该定义意味着,在某些情况下,计算来自所有上游RDD的分区数 . 在您的情况下,这意味着要求“文件系统”(可能是Hadoop,可能是本地的,......)来执行任何必要的操作(例如,对Hadoop文件系统的单个调用可以返回多个文件,每个文件也可以分割根据其InputFormat定义的各种优化,只能通过实际查找它们才能知道 .

    这就是在这里执行的内容,而不是实际的DAG(例如;你的map / flatMap / aggregate,......) .

    您可以通过在此按键变量中提供自己的分区程序来避免它:

    reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
    

相关问题