首页 文章

AWS胶水作业在s3上的大输入csv数据失败

提问于
浏览
3

对于小的s3输入文件(~10GB),胶水ETL作业可以正常工作,但对于较大的数据集(~200GB),作业失败 .

添加部分ETL代码 .

# Converting Dynamic frame to dataframe
df = dropnullfields3.toDF()

# create new partition column
partitioned_dataframe = df.withColumn('part_date', df['timestamp_utc'].cast('date'))

# store the data in parquet format on s3 
partitioned_dataframe.write.partitionBy(['part_date']).format("parquet").save(output_lg_partitioned_dir, mode="append")

工作执行了4个小时并引发了错误 .

文件“script_2017-11-23-15-07-32.py”,第49行,在partitioned_dataframe.write.partitionBy(['part_date']) . format(“parquet”) . save(output_lg_partitioned_dir,mode =“append “)文件”/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652

02_000001/pyspark.zip/pyspark/sql/readwriter.py“,第550行,在保存文件中”/ mnt / yarn / usercache / root / appcache / application_1511449472652_0001 / container_1511449472652

02_000001 / py4j-0.10.4-src.zip / py4j / java_gateway.py“,第1133行,在调用文件中”/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652

02_000001/pyspark.zip/pyspark/sql /utils.py” 63行,在装饰文件 “/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652

02_000001/py4j-0.10.4-src.zip/py4j/protocol.py”,线路319,在get_return_value py4j.protocol.Py4JJavaError:调用o172.save时发生错误 . :org.apache.spark.SparkException:作业已中止 . 在org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun:在org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $写$ 1.适用$ MCV $ SP(147 FileFormatWriter.scala)在org.apache.spark.sql上的org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $ write $ 1.apply(FileFormatWriter.scala:121)$ $ $ apply(FileFormatWriter.scala:121) . execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:57)在org.apache.spark.sql.execution.datasources.FileFormatWriter $ .WRITE(FileFormatWriter.scala:121)在org.apache.spark.sql.execution.datasources . 在org.apache.spache.spark.sql.execution.command.ExecutedCommandExec的org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult $ lzycompute(commands.scala:58)的InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) . orE.apache.spark.sql.execution.Sp上的org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)中的sideEffectResult(commands.scala:56) arkPlan $$ anonfun $执行$ 1.apply(SparkPlan.scala:114)org.apache.spark.sql.execution.SparkPlan $$ anonfun $执行$ 1.apply(SparkPlan.scala:114)org.apache.spark . 位于org.apache.spark.sql.execution的org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151)的sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply(SparkPlan.scala:135) .SparkPlan.executeQuery(SparkPlan.scala:132)org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)at org.apache.spark.sql.execution.QueryExecution.toRdd $ lzycompute(QueryExecution) .scala:87)org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)at at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)在org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccess orImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在java.lang.reflect.Method.invoke(Method.java:498)在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java :244)在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在py4j.Gateway.invoke(Gateway.java:280)在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在py4j.commands .CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:748)引起:org.apache.spark.SparkException:由于阶段失败导致作业中止:3385任务的序列化结果总大小(1024.1 MB)大于org.apache.spark.scheduler.DAGScheduler.org上的spark.driver.maxResultSize(1024.0 MB)$ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1435)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1423)at org.apache.s park.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1422)at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach( ArrayBuffer.scala:48)atorg.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)atg.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:802)at org.apache.spark . scheduler.DAGScheduler $$ anonfun $ $ handleTaskSetFailed 1.适用(DAGScheduler.scala:802)在scala.Option.foreach(Option.scala:257)在org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala:1594)在org.apache.spark.util.EventLoop $$匿名$ 1.run(EventLoop.scala:48)在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)在org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)在org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)在org.apache.spark.SparkContext .runJob(SparkContext.scala:1951)在org.apache.spark.sql.execution.datasources.FileFormatWriter $$ anonfun $写$ 1.适用$ MCV $ SP(FileFormatWriter.scala:127)...... 30多种日志类型的结束:标准输出

如果您能提供解决此问题的任何指导,我将不胜感激 .

1 回答

  • 2

    您只能在上下文实例化期间设置 maxResultSize 等可配置选项,而glue会为您提供上下文(从内存中您可以认为您可以更改此属性的值 .

    如果您向驱动程序收集超过指定大小的结果,通常会收到此错误 . 在这种情况下你不是这样做的,所以错误令人困惑 .

    看起来你正在产生3385个任务,这些任务可能与输入文件中的日期相关(3385个日期,〜9年?) . 您可以尝试批量编写此文件,例如

    partitioned_dataframe = df.withColumn('part_date', df['timestamp_utc'].cast('date'))
    for year in range(2000,2018):
        partitioned_dataframe = partitioned_dateframe.where(year(part_date) = year)
        partitioned_dataframe.write.partitionBy(['part_date'])
            .format("parquet")
            .save(output_lg_partitioned_dir, mode="append")
    

    我没有't checked this code; you' ll至少需要导入 pyspark.sql.functions.year 才能正常工作 .

    当我使用Glue进行数据处理时,我发现批处理工作比尝试成功完成大型数据集更有效 . 系统很好但很难调试;大数据的稳定性并不容易 .

相关问题