如何在pyspark中使用df.write.csv附加到csv文件?

我正在尝试使用 df.write.csv 将数据附加到我的csv文件中 . 这是我按照火花文件http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter后所做的:

from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv("/opt/Output/sqlcsvA.csv", append) #also tried 'mode=append'

执行上面的代码给我错误:

NameError:名称'append'未定义

没有附加,错误:

路径已存在 .

回答(3)

2 years ago

df.write.save(path='csv', format='csv', mode='append', sep='\t')

2 years ago

来自文档:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter自v1.4起

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None)

例如

from pyspark.sql import DataFrameWriter
.....
df1 = sqlContext.createDataFrame(query1)
df1.write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")

如果要编写单个文件,可以在其中任何一行上使用coalesce或 repartition . 无论哪一行都没关系,因为数据帧只是一个DAG执行,在写入csv之前不会执行任何操作 . repartitioncoalesce 有效地使用相同的代码,但是coalesce只能减少 repartition 也可以增加它们的分区数量 . 为简单起见,我只是坚持 repartition .

例如

df1 = sqlContext.createDataFrame(query1).repartition(1)

要么

df1.repartition(1).write.csv(path="/opt/Output/sqlcsvA.csv", mode="append")

我认为文档中的示例并不好,它们没有显示使用路径以外的参数的示例 .

参考你尝试过的两件事:

(append)

为了实现这一点,需要一个名为append的字符串变量,其中包含值“append” . DataFrameWriter库中没有名为append的字符串常量 . 即你可以在代码中添加它,然后它就可以了 . append =“append”

('mode=append')

为了实现这一点,csv方法必须解析 mode=append 字符串以获取模式的值,当你只需要一个具有需要提取的值"append"或"overwrite"的参数时,这将是额外的工作 . 没有一个特殊情况,内置Python,不是特定于pyspark .

另外,我建议尽可能使用命名参数 . 例如

csv(path="/path/to/file.csv", mode="append")

而不是位置参数

csv("/path/to/file.csv", "append")

它更清晰,有助于理解 .

2 years ago

我不是关于Python,但是在Scala和Java中,可以通过以下方式设置保存模式:

df.write.mode("append").csv("pathToFile")

我认为它应该在Python中类似 . This可能会有所帮助 .