首页 文章

DataFrame partitionBy到单个Parquet文件(每个分区)

提问于
浏览
33

我想修复/合并我的数据,以便将其保存到每个分区的一个Parquet文件中 . 我还想使用Spark SQL partitionBy API . 所以我可以这样做:

df.coalesce(1).write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")

我已经测试了这个并且它似乎表现不佳 . 这是因为在数据集中只有一个分区可以处理,文件的所有分区,压缩和保存都必须由一个CPU内核完成 .

在调用coalesce之前,我可以重写这个来手动执行分区(使用带有不同分区值的过滤器) .

但是使用标准的Spark SQL API有更好的方法吗?

2 回答

  • 6

    我有完全相同的问题,我找到了一种方法来使用 DataFrame.repartition() . 使用 coalesce(1) 的问题在于您的并行性降至1,并且它最多可能很慢并且最坏时出错 . 增加这个数字也无济于事 - 如果你做了 coalesce(10) ,你会得到更多的并行性,但最终每个分区有10个文件 .

    要在不使用 coalesce() 的情况下为每个分区获取一个文件,请使用 repartition() ,并使用相同的列来对输出进行分区 . 所以在你的情况下,这样做:

    df.repartition("entity", "year", "month", "day", "status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")
    

    一旦我这样做,我得到每个输出分区一个镶木地板文件,而不是多个文件 .

    我在Python中对此进行了测试,但我认为在Scala中它应该是相同的 .

  • 63

    根据定义:

    coalesce(numPartitions:Int):DataFrame返回一个具有正确numPartitions分区的新DataFrame .

    您可以使用它来使用numPartitions参数减少RDD / DataFrame中的分区数 . 在过滤大型数据集后,它对于更有效地运行操作非常有用 .

    关于你的代码,它表现不佳,因为你实际做的是:

    • 将所有内容放入1个分区,这会使驱动程序重载,因为它会将所有数据拉入驱动程序的1个分区(这也不是一个好习惯)

    • coalesce 实际上洗牌了网络上的所有数据,这也可能导致性能下降 .

    随机播放是Spark重新分发数据的机制,因此它可以跨分区进行不同的分组 . 这通常涉及跨执行程序和机器复制数据,使得混洗成为复杂且昂贵的操作 .

    shuffle 概念对于管理和理解非常重要 . 由于涉及磁盘I / O,数据序列化和网络I / O,因此它是一项昂贵的操作,因此总是最好将最小化进行洗牌 . 为了组织shuffle的数据,Spark生成了一系列任务 - 映射任务以组织数据,以及一组reduce任务来聚合它 . 这个术语来自MapReduce,并不直接与Spark的 Map 和减少操作相关 .

    在内部,各个 Map 任务的结果会保留在内存中,直到它们无法适应 . 然后,这些基于目标分区进行排序并写入单个文件 . 在reduce方面,任务读取相关的排序块 .

    关于镶木地板的分区,我建议您阅读有关Parquet Partitioning的Spark DataFrames的答案here以及 Performance Tuning 的Spark编程指南中的section .

    我希望这有帮助 !

相关问题