我有一个场景,其中包括group by的一定数量的操作必须应用于许多小的(每个~300MB)文件 . 操作看起来像这样..
df.groupBy(....).agg(....)
现在要在多个文件上处理它,我可以使用通配符“/**/*.csv”,然而,它创建一个RDD并将其分区为操作 . 但是,看一下这些操作,如果文件是互斥的,那么它就是一个分组并涉及大量的shuffle,这是不必要的 .
我正在研究的是,我可以在文件上创建独立的RDD并独立操作它们 .
这是一个想法而不是一个完整的解决方案,我还没有测试过它 .
您可以从将数据处理管道提取到函数中开始 .
def pipeline(f: String, n: Int) = { sqlContext .read .format("com.databricks.spark.csv") .option("header", "true") .load(f) .repartition(n) .groupBy(...) .agg(...) .cache // Cache so we can force computation later }
如果您的文件很小,您可以调整 n 参数以使用尽可能少的分区来适应单个文件中的数据并避免混乱 . 这意味着你限制了并发性,但我们稍后会回到这个问题 .
n
val n: Int = ???
接下来,您必须获取输入文件列表 . 此步骤取决于数据源,但大多数情况下它或多或少是直接的:
val files: Array[String] = ???
接下来,您可以使用 pipeline 函数映射上面的列表:
pipeline
val rdds = files.map(f => pipeline(f, n))
由于我们通过提交多个作业来限制单个文件级别的并发性 . 让我们添加一个简单的帮助器,强制评估并用 Future 包裹它
Future
import scala.concurrent._ import ExecutionContext.Implicits.global def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future { df.rdd.foreach(_ => ()) // Force computation df }
最后我们可以在 rdds 上使用上面的帮助:
rdds
val result = Future.sequence( rdds.map(rdd => pipelineToFuture(rdd)).toList )
根据您的要求,您可以添加 onComplete 回调或使用反应流来收集结果 .
onComplete
如果你有很多文件,并且每个文件很小(你说300 MB以上,我认为它对Spark来说很小),你可以尝试使用 SparkContext.wholeTextFiles ,这将创建一个RDD,其中每个记录是一个完整的文件 .
SparkContext.wholeTextFiles
2 回答
这是一个想法而不是一个完整的解决方案,我还没有测试过它 .
您可以从将数据处理管道提取到函数中开始 .
如果您的文件很小,您可以调整
n
参数以使用尽可能少的分区来适应单个文件中的数据并避免混乱 . 这意味着你限制了并发性,但我们稍后会回到这个问题 .接下来,您必须获取输入文件列表 . 此步骤取决于数据源,但大多数情况下它或多或少是直接的:
接下来,您可以使用
pipeline
函数映射上面的列表:由于我们通过提交多个作业来限制单个文件级别的并发性 . 让我们添加一个简单的帮助器,强制评估并用
Future
包裹它最后我们可以在
rdds
上使用上面的帮助:根据您的要求,您可以添加
onComplete
回调或使用反应流来收集结果 .如果你有很多文件,并且每个文件很小(你说300 MB以上,我认为它对Spark来说很小),你可以尝试使用
SparkContext.wholeTextFiles
,这将创建一个RDD,其中每个记录是一个完整的文件 .