Spark大反序列化时间

我是Spark的新手,我似乎有一些性能问题 . 我正在尝试计算我的DataFrame中不同参数之间的简单计算(我在Spark 1.5.2上使用PySpark来做到这一点),但问题是我的任务反序列化时间与实际计算相比是巨大的 .

以下是计算两对不同参数之间的计算时的屏幕截图 .

time 1

time 2

要计算相关性,我只是使用 full_dataframe.stat.corr('param1', 'param2') . 首先缓存数据集,然后执行此计算 . 我在一个循环中调用这一行,我遍历不同的参数组合 . 缓存数据集的大小为5.2GB .

我在4台机器(YARN)上运行这项工作,每台机器都有:

  • 10GB RAM(为YARN保留8GB)

  • 8个核心(16个虚拟核心,14个为YARN保留)

我正在通过Jupyter使用PySpark,我已经开始使用:

pyspark --master yarn --driver-memory 2560m --num-executors 4 --executor-cores 4 --executor-memory 5G --conf spark.yarn.executor.memoryOverhead = 2048

我尝试使用 df.repartition(no_of_partitions) 使用不同数量的分区,例如16,32,128,256,但没有任何帮助 .

另外,过了一段时间我的工作完全破坏了,我从ui得到以下错误:

HTTP ERROR 500

Problem accessing /proxy/application_1485432889177_0016/stages/stage. Reason:

    Connection to http://192.168.84.27:4040 refused
Caused by:

org.apache.http.conn.HttpHostConnectException: Connection to http://192.168.84.27:4040 refused

当我查看我的Jupyter的输出时,我看到了以下异常:

17/01/29 17:06:06 ERROR Utils: Uncaught exception in thread task-result-getter-14
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.ByteVectorImpl.trim(ByteVectorImpl.java:70)
        at sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:386)
        at sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:112)
        at sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:340)
        at java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
        at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
        at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
Exception in thread "task-result-getter-13" 17/01/29 17:06:06 ERROR Utils: Uncaught exception in thread task-result-getter-15
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.ByteVectorImpl.trim(ByteVectorImpl.java:70)
        at sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:386)
        at sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:112)
        at sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:340)
        at java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
        at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
        at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.ByteVectorImpl.trim(ByteVectorImpl.java:70)
        at sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:386)
        at sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:112)
        at sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:340)
        at java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
        at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
        at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
Exception in thread "task-result-getter-12" Exception in thread "task-result-getter-14" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.ByteVectorImpl.trim(ByteVectorImpl.java:70)
        at sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:386)
        at sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:112)
        at sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:340)
        at java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
        at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
        at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)

这种行为可能是什么原因以及如何解决?


更改最小和最大分割大小后,我在驱动程序中得到了类似的GC错误:

17/01/29 21:37:43 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-18] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.ByteVectorImpl.resize(ByteVectorImpl.java:84)
        at sun.reflect.ByteVectorImpl.add(ByteVectorImpl.java:63)
        at sun.reflect.ClassFileAssembler.emitByte(ClassFileAssembler.java:74)
        at sun.reflect.ClassFileAssembler.emitShort(ClassFileAssembler.java:63)
        at sun.reflect.ClassFileAssembler.emitConstantPoolNameAndType(ClassFileAssembler.java:120)
        at sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:313)
        at sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:112)
        at sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:340)
        at java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
        at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
        at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeObject$1.apply$mcV$sp(TorrentBroadcast.scala:162)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
        at org.apache.spark.broadcast.TorrentBroadcast.writeObject(TorrentBroadcast.scala:160)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
17/01/29 21:37:46 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.ByteVectorImpl.resize(ByteVectorImpl.java:84)
        at sun.reflect.ByteVectorImpl.add(ByteVectorImpl.java:63)
        at sun.reflect.ClassFileAssembler.emitByte(ClassFileAssembler.java:74)
        at sun.reflect.ClassFileAssembler.emitShort(ClassFileAssembler.java:63)
        at sun.reflect.ClassFileAssembler.emitConstantPoolNameAndType(ClassFileAssembler.java:120)
        at sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:313)
        at sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:112)
        at sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:340)
        at java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
        at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
        at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeObject$1.apply$mcV$sp(TorrentBroadcast.scala:162)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
        at org.apache.spark.broadcast.TorrentBroadcast.writeObject(TorrentBroadcast.scala:160)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
17/01/29 21:37:50 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-17] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.ByteVectorImpl.resize(ByteVectorImpl.java:84)
        at sun.reflect.ByteVectorImpl.add(ByteVectorImpl.java:63)
        at sun.reflect.ClassFileAssembler.emitByte(ClassFileAssembler.java:74)
        at sun.reflect.ClassFileAssembler.emitShort(ClassFileAssembler.java:63)
        at sun.reflect.ClassFileAssembler.emitConstantPoolNameAndType(ClassFileAssembler.java:120)
        at sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:313)
        at sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:112)
        at sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:340)
        at java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
        at java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
        at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeObject$1.apply$mcV$sp(TorrentBroadcast.scala:162)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
        at org.apache.spark.broadcast.TorrentBroadcast.writeObject(TorrentBroadcast.scala:160)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:50 WARN YarnHistoryService: Discarding event
17/01/29 21:37:54 ERROR ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem [sparkDriver]

在ui上:

HTTP ERROR 500

Problem accessing /jobs/. Reason:

    Server Error
Caused by:

java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.Arrays.copyOf(Arrays.java:3332)

回答(0)