首页 文章

将多个文件并行处理为独立的RDD

提问于
浏览
3

我有一个场景,其中包括group by的一定数量的操作必须应用于许多小的(每个~300MB)文件 . 操作看起来像这样..

df.groupBy(....).agg(....)

现在要在多个文件上处理它,我可以使用通配符“/**/*.csv”,然而,它创建一个RDD并将其分区为操作 . 但是,看一下这些操作,如果文件是互斥的,那么它就是一个分组并涉及大量的shuffle,这是不必要的 .

我正在研究的是,我可以在文件上创建独立的RDD并独立操作它们 .

2 回答

  • 7

    这是一个想法而不是一个完整的解决方案,我还没有测试过它 .

    您可以从将数据处理管道提取到函数中开始 .

    def pipeline(f: String, n: Int) = {
        sqlContext
            .read
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .load(f)
            .repartition(n)
            .groupBy(...)
            .agg(...)
            .cache // Cache so we can force computation later
    }
    

    如果您的文件很小,您可以调整 n 参数以使用尽可能少的分区来适应单个文件中的数据并避免混乱 . 这意味着你限制了并发性,但我们稍后会回到这个问题 .

    val n: Int = ???
    

    接下来,您必须获取输入文件列表 . 此步骤取决于数据源,但大多数情况下它或多或少是直接的:

    val files: Array[String] = ???
    

    接下来,您可以使用 pipeline 函数映射上面的列表:

    val rdds = files.map(f => pipeline(f, n))
    

    由于我们通过提交多个作业来限制单个文件级别的并发性 . 让我们添加一个简单的帮助器,强制评估并用 Future 包裹它

    import scala.concurrent._
    import ExecutionContext.Implicits.global
    
    def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
        df.rdd.foreach(_ => ()) // Force computation
        df
    }
    

    最后我们可以在 rdds 上使用上面的帮助:

    val result = Future.sequence(
       rdds.map(rdd => pipelineToFuture(rdd)).toList
    )
    

    根据您的要求,您可以添加 onComplete 回调或使用反应流来收集结果 .

  • 0

    如果你有很多文件,并且每个文件很小(你说300 MB以上,我认为它对Spark来说很小),你可以尝试使用 SparkContext.wholeTextFiles ,这将创建一个RDD,其中每个记录是一个完整的文件 .

相关问题