我正在尝试基于此示例生成层次结构信息:https://www.qubole.com/blog/processing-hierarchical-data-using-spark-graphx-pregel-api/

在根据需要调整代码之后,当我运行代码时,它在我的本地工作正常,但我在EMR中获得了NPE .

val hrchyRDD = initialGraph.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(setMsg, sendMsg, mergeMsg)

我还增加了额外的内存,减少了执行者的数量等等,但无济于事 .

scala> sc.getConf.getAll;
res3: Array[(String, String)] = Array((spark.eventLog.enabled,true), 
(spark.app.id,application_1527179090729_0015), (spark.driver.extraLibraryPath,/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native), (spark.default.parallelism,2560), (spark.blacklist.decommissioning.timeout,1h), (spark.yarn.secondary.jars,bcpg-jdk15on-158.jar,bcprov-jdk15on-158.jar,aws-encryption-sdk-java-1.3.1.jar), (spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS,$(hostname -f)), (spark.driver.memory,5g), (spark.driver.port,42783), (spark.executor.cores,4), (spark.yarn.historyServer.address,ip-10-0-35-88.ec2.internal:18080), (spark.repl.class.uri,spark://10.0.35.88:42783/classes), (spark.executor.instances,6), (spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES,http://ip-10-0-35-88....

sc.getConf.get("spark.executor.memory")
res4: String = 7g

sc.getConf.get("spark.executor.instances")
res5: String = 6

堆栈跟踪:

org.apache.spark.SparkException:由于阶段失败导致作业中止:阶段3.0中的任务145失败4次,最近失败:阶段3.0中丢失任务145.3(TID 1340,ip-10-0-36-20.ec2 .internal,executor 33):$ anonfun $ 5.apply(:61)at $ anonfun $ 5.apply(:61)atg.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala)的java.lang.NullPointerException :61)org.apache.spark.graphx.impl.GraphImpl $ anonfun $ mapVertices $ 1.apply(GraphImpl.scala:136)at org.apache.spark.graphx.impl.GraphImpl $ anonfun $ mapVertices $ 1.apply(GraphImpl .scala:136)在org.apache.spark的org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)的scala.collection.Iterator $ anon $ 11.next(Iterator.scala:409) .storage.BlockManager $ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:1039)org.apache.spark.storage.BlockManager $ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:1030)org.apache.spark.storage .blockManager.doPut(BlockManager.scala:970)在org.apache.spark . storage.BlockManager.doPutIterator(BlockManager.scala:1030)atg.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:761)at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) org.apache.spark.rdd.RDD.iterator(RDD.scala:285)org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)at org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala:323)org.apache.spark.rdd.RDD.iterator(RDD.scala:287)位于org.apache的org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) . 位于org.apache.spark.rdark.RDD的上一个org.apache.rdark.RDD.iterator(RDD.scala:287)的spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323).SuffffMapTask.runTask(ShuffleMapTask.scala: 96)org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)atg.apache.spark.scheduler.Task.run(Task.scala:108)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:335)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecuto) r.java:1149)java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)驱动程序stacktrace:at org.apache.spark . scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $ failJobAndIndependentStages(DAGScheduler.scala:1690)org.apache.spark.scheduler.DAGScheduler $ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1678)at org.apache .spark.scheduler.DAGScheduler $ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1677)at scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)at scala.collection.mutable.ArrayBuffer.foreach( ArrayBuffer.scala:48)org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)at org.apache.spark.scheduler.DAGScheduler $ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:855)at at org.apache.spark.scheduler.DAGScheduler $ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:855)位于org.apache.spark.scheduler.DAGSchedu的scala.Option.foreach(Option.scala:257) ler.handleTaskSetFailed(DAGScheduler.scala:855)org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)atg.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)at org .apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)atg.apache.spark.util.EventLoop $ anon $ 1.run(EventLoop.scala:48)at org.apache.spark.scheduler.DAGScheduler . runJob(DAGScheduler.scala:671)org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)org.apache.spark.rdd .RDD $ anonfun $ reduce $ 1.apply(RDD.scala:1026)org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope $ .withScope( RDDOperationScope.scala:112)org.apache.spark.rdd.RDD.withScope(RDD.scala:362)位于org.apache.spark的org.apache.spark.rdd.RDD.reduce(RDD.scala:1008) .graphx.impl.VertexRDDImpl.count(VertexRDDIm pl.scala:90)atg.apache.spark.graphx.Pregel $ .apply(Pregel.scala:140)at org.apache.spark.graphx.GraphOps.pregel(GraphOps.scala:370)at calcTopLevelHierarchy(:62) )... 54 elided引起:$ anonfun $ 5.apply(:61)at $ anonfun $ 5.apply(:61)atg.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps)的java.lang.NullPointerException .scala:61)atg.apache.spark.graphx.impl.GraphImpl $ anonfun $ mapVertices $ 1.apply(GraphImpl.scala:136)at org.apache.spark.graphx.impl.GraphImpl $ anonfun $ mapVertices $ 1.apply (GraphImpl.scala:136)atscala.collection.Iterator $ anon $ 11.next(Iterator.scala:409)atg.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)at org.apache.spark.storage.BlockManager $ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:1039)org.apache.spark.storage.BlockManager $ anonfun $ doPutIterator $ 1.apply(BlockManager.scala:1030)org.apache.spark.storage.BlockManager.doPut(BlockManager) .scala:970)org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1030)位于org.apache.spark的org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:761) . rdd.RDD.getOrCompute(RDD.scala:334)atg.apache.spark.rdd.RDD.iterator(RDD.scala:285)org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)位于org.apache.spark.rdd.MapPartitionsRDD.compute的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)org.apache.spark.rdd.RDD.iterator(RDD.scala:287) (MapPartitionsRDD.scala:38)org.apache.spark.rdd.RDD.computeOrRead org.apache的org.apache.spark.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)org.apache.spark.rdd.RDD.iterator(RDD.scala:287)的Checkpoint(RDD.scala:323) .spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)atg.apache.spark.scheduler.Task.run(Task.scala:108)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor) .scala:335)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run( Thread.java:748)