首页 文章

将新数据附加到分区的镶木地板文件

提问于
浏览
13

我正在编写一个ETL过程,我需要读取每小时的日志文件,对数据进行分区并保存 . 我正在使用Spark(在Databricks中) . 日志文件是CSV,因此我阅读它们并应用模式,然后执行我的转换 .

我的问题是,如何将每小时的数据保存为镶木地板格式,但是附加到现有数据集?保存时,我需要按数据框中的4列进行分区 .

这是我的保存行:

data
    .filter(validPartnerIds($"partnerID"))
    .write
    .partitionBy("partnerID","year","month","day")
    .parquet(saveDestination)

问题是如果目标文件夹存在,则save会抛出错误 . 如果目的地不存在,那么我不会附加我的文件 .

我尝试过使用 .mode("append") ,但我发现Spark有时会失败,所以我最终失去了我写入的数据量以及我还需要写多少 .

我正在使用镶木地板,因为分区大大增加了我将来的查询 . 同样,我必须将数据写为磁盘上的某种文件格式,并且不能使用像Druid或Cassandra这样的数据库 .

非常感谢有关如何划分我的数据帧和保存文件(坚持镶木地板或其他格式)的任何建议 .

2 回答

  • 0

    如果需要附加文件,则必须使用追加模式 . 我不知道你希望它生成多少个分区,但我发现如果你有很多分区, partitionBy 会导致很多问题(内存和IO问题都一样) .

    如果您认为您的问题是由写入操作耗时太长造成的,我建议您尝试以下两件事:

    1)通过添加配置使用snappy:

    conf.set("spark.sql.parquet.compression.codec", "snappy")
    

    2)在 SparkContext 上的 hadoopConfiguration 中禁用生成元数据文件,如下所示:

    sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
    

    生成元数据文件会有点费时(参见this blog post),但根据this,它们实际上并不重要 . 就个人而言,我总是禁用它们,没有任何问题 .

    如果你生成了很多分区(> 500),我担心我能做的最好就是建议你研究一个不使用append-mode的解决方案 - 我根本就没有设法让 partitionBy 与那么多分区一起工作 .

  • 11

    如果您使用的是未分类的分区,那么您的数据将分散在所有分区中 . 这意味着每个任务都会为每个输出文件生成和写入数据 .

    在编写之前,请考虑根据分区列重新分区数据,以使每个输出文件的所有数据位于相同的分区上:

    data
     .filter(validPartnerIds($"partnerID"))
     .repartition([optional integer,] "partnerID","year","month","day")
     .write
     .partitionBy("partnerID","year","month","day")
     .parquet(saveDestination)
    

    见:DataFrame.repartition

相关问题