我有一份工作,每次迭代后写入s3 . 我正在使用csv格式(.gzip) . 即使我覆盖了该位置,第一次迭代后作业也会失败,抛出该文件存在的错误 . 我尝试追加,但仍然遇到同样的问题 . 代码如下所示:

vdna_report_table_tmp.coalesce(2).write.save(path='s3://analyst-adhoc/elevate/tempData/VDNA_BRANDSURVEY_REPORT_TABLE_tmp/', format='csv', sep='|', compression='gzip', header=False, mode='overwrite')

并且错误如下所示:

Caused by: java.io.IOException: File already exists:s3://analyst-adhoc/elevate/tempData/VDNA_BRANDSURVEY_REPORT_TABLE_tmp/part-r-00001-69d1e948-c609-42b7-962e-451a23bbd3b3.csv.gz
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:613)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:793)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:178)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVRelation.scala:200)
    at org.apache.spark.sql.execution.datasources.csv.CSVOutputWriterFactory.newInstance(CSVRelation.scala:170)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

我正在使用pyspark 2.0.0

我也尝试在镶木地板上写一切 . 总之,我正在做以下步骤 .

  • 从某个位置读取数据(比方说A)

  • 从另一个获得更多数据(比方说B)

  • 联合A和B.

  • 在位置A写入(我尝试追加和覆盖都失败,说文件已经存在)