首页 文章

如何将DataFrame保存为压缩(gzip)CSV?

提问于
浏览
11

我使用Spark 1.6.0和Scala .

我想将DataFrame保存为压缩CSV格式 .

这是我到目前为止(假设我已经 dfscSparkContext ):

//set the conf to the codec I want
sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

df.write
  .format("com.databricks.spark.csv")
  .save(my_directory)

输出不是 gz 格式 .

4 回答

  • 2

    在spark-csv github上:https://github.com/databricks/spark-csv

    人们可以读到:

    编解码器:保存到文件时使用的压缩编解码器 . 应该是实现org.apache.hadoop.io.compress.CompressionCodec的类的完全限定名称,或者是一个不区分大小写的缩短名称(bzip2,gzip,lz4和snappy) . 未指定编解码器时,默认为无压缩 .

    在你的情况下,这应该工作: df.write.format("com.databricks.spark.csv").codec("gzip")\ .save('my_directory/my_file.gzip')

  • 18

    此代码适用于Spark 2.1,其中 .codec 不可用 .

    df.write
      .format("com.databricks.spark.csv")
      .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
      .save(my_directory)
    

    对于Spark 2.2,您可以使用此处描述的 df.write.csv(...,codec="gzip") 选项:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec

  • 5

    使用Spark 2.0,这变得有点简单:

    df.write.csv("path", compression="gzip")
    

    您不再需要外部Databricks CSV软件包 .

    csv() 编写器支持许多方便的选项 . 例如:

    • sep :设置分隔符 .

    • quote :是否以及如何引用值 .

    • header :是否包含 Headers 行 .

    除了 gzip 之外,您还可以使用许多其他压缩编解码器:

    • bzip2

    • lz4

    • snappy

    • deflate

    csv() 作者的完整Spark文档在这里:Python / Scala

  • 9

    要编写带有 Headers 的CSV文件,并将part-000文件重命名为.csv.gzip

    DF.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite")
    .option("header","true")
    .option("codec",org.apache.hadoop.io.compress.GzipCodec").save(tempLocationFileName)
    
    copyRename(tempLocationFileName, finalLocationFileName)
    
    def copyRename(srcPath: String, dstPath: String): Unit =  {
      val hadoopConfig = new Configuration()
      val hdfs = FileSystem.get(hadoopConfig)
      FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
      // the "true" setting deletes the source files once they are merged into the new output
    }
    

    如果您不需要 Headers ,则将其设置为false,您也不需要进行合并 . 写作也会更快 .

相关问题