我使用Spark 1.6.0和Scala .
我想将DataFrame保存为压缩CSV格式 .
这是我到目前为止(假设我已经 df
和 sc
为 SparkContext
):
//set the conf to the codec I want
sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
df.write
.format("com.databricks.spark.csv")
.save(my_directory)
输出不是 gz
格式 .
4 回答
在spark-csv github上:https://github.com/databricks/spark-csv
人们可以读到:
在你的情况下,这应该工作:
df.write.format("com.databricks.spark.csv").codec("gzip")\ .save('my_directory/my_file.gzip')
此代码适用于Spark 2.1,其中
.codec
不可用 .对于Spark 2.2,您可以使用此处描述的
df.write.csv(...,codec="gzip")
选项:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec使用Spark 2.0,这变得有点简单:
您不再需要外部Databricks CSV软件包 .
csv()
编写器支持许多方便的选项 . 例如:sep
:设置分隔符 .quote
:是否以及如何引用值 .header
:是否包含 Headers 行 .除了
gzip
之外,您还可以使用许多其他压缩编解码器:bzip2
lz4
snappy
deflate
csv()
作者的完整Spark文档在这里:Python / Scala要编写带有 Headers 的CSV文件,并将part-000文件重命名为.csv.gzip
如果您不需要 Headers ,则将其设置为false,您也不需要进行合并 . 写作也会更快 .