首页 文章

PySpark简单的记录数

提问于
浏览
0

我正在努力学习Spark .

adultsrdd = sc.textFile("dbfs:/databricks-datasets/adult/adult.data")
educationrdd =adultsrdd.map(lambda row: row.split(',')[3])
educationrdd.take(5)

给出以下结果 .

出[78]:[你们是'单身汉',你'单身汉',你'HS-grad',你'第11',你'单身汉''

educationrdd.count()

org.apache.spark.SparkException:作业因阶段失败而中止:阶段259.0中的任务1失败1次,最近失败:阶段259.0中失去的任务1.0(TID 859,localhost):org.apache.spark.api . python.PythonException:Traceback(最近一次调用最后一次):

为什么我在count()上得到错误?

跟踪:

Py4JJavaError:调用z:org.apache.spark.api.python.PythonRDD.collectAndServe时发生错误 . :org.apache.spark.SparkException:作业因阶段失败而中止:阶段1.0中的任务1失败1次,最近失败:阶段1.0中失去的任务1.0(TID 2,localhost):org.apache.spark.api . python.PythonException:Traceback(最近一次调用最后一次):在主进程()文件“/databricks/spark/python/pyspark/worker.py “,第167行,在进程serializer.dump_stream(func(split_index,iterator),outfile)文件”/databricks/spark/python/pyspark/rdd.py“,第2371行,在pipeline_func中返回func(split,prev_func(split, iterator))文件“/databricks/spark/python/pyspark/rdd.py”,第2371行,在pipeline_func中返回func(split,prev_func(split,iterator))文件“/databricks/spark/python/pyspark/rdd.py “,第2371行,在pipeline_func中返回func(split,prev_func(split,iterator))文件”/databricks/spark/python/pyspark/rdd.py“,第317行,在func中返回f(迭代器)文件”/ databricks / spark / python / pyspark / rdd.py“,第1008行,返回self.mapPart itions(lambda i:[sum(1 for _ in i)] . sum()File“/ dataricks/spark/python/pyspark/rdd.py”,第1008行,返回self.mapPartitions(lambda i:[sum (1 for _ in i)]) . sum()文件“”,第3行,在IndexError中:列表索引超出范围org.apache.spark.api.python.PythonRunner $$ anon $ 1.read(PythonRDD.scala :193)org.apache.spark.api.python.PythonRunner $$ anon $ 1.(PythonRDD.scala:234)org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)at org位于org.apache.spark.rdd.RDD.iterator的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)的.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) (RDD.scala:283)位于org.apache的org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)org.apache.spark.scheduler.Task.run(Task.scala:86) . java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:)中的spark.executor.Executor $ TaskRunner.run(Executor.scala:314) 617) at java.lang.Thread.run(Thread.java:745)驱动程序stacktrace:at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1454)at org .apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1442)at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply(DAGScheduler.scala:1441)at scala .collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)位于org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala)的scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) :1441)org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala:811)at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ handleTaskSetFailed $ 1.apply(DAGScheduler.scala :811)在org.apache.spark.scheduler.DAGScheduler的org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)的scala.Option.foreach(Option.scala:257)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)的org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)的org.apacheLoop.doOnReceive(DAGScheduler.scala:1667)位于org.apache.spark.SparkContext.runJob的org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)上的.apache.spark.util.EventLoop $$ anon $ 1.run(EventLoop.scala:48) (SparkContext.scala:1891)org.apache.spark.SparkContext.runJob(SparkContext.scala:1904)org.apache.spark.SparkContext.runJob(SparkContext.scala:1917)at org.apache.spark.SparkContext . runJob(SparkContext.scala:1931)atg.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply(RDD.scala:912)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala :151)org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)atg.apache.spark.rdd.RDD.withScope(RDD.scala:358)atg.apache.spark.rdd .RDD.collect(RDD.scala:911)位于org.apache.spark.a位于sun.reflect的sun.reflect.NativeMethodAccessorImpl.invoke0(本地方法)的org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)中的pi.python.PythonRDD $ .collectAndServe(PythonRDD.scala:453) .nativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at atjava.lang.reflect.Method.invoke(Method.java:498)py4j.reflection.MethodInvoke.invoke(MethodInvoker.java:237)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)py4j.Gateway .invoke(Gateway.java:280)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)py4j.GatewayConnection.run(GatewayConnection.java: 214)at java.lang.Thread.run(Thread.java:745)引起:org.apache.spark.api.python.PythonException:Traceback(最近一次调用最后一次):文件“/ databricks / spark / python / pyspark /worker.py“,第172行,在主进程中()文件”/databricks/spark/python/pyspark/worker.py“,第167行,进程中serializer.dump_stream(func(split_index,iterator),outfile)文件” /databricks/spark/python/pyspark/rdd.py“,第2371行,在pipeline_func中返回func(split,prev_func(split,iterator))文件”/databricks/spark/python/pyspark/rdd.py“,第2371行,在pipeline_func中返回func(split,prev_func(split,iterator) )文件“/databricks/spark/python/pyspark/rdd.py”,第2371行,在pipeline_func中返回func(split,prev_func(split,iterator))文件“/databricks/spark/python/pyspark/rdd.py”,第317行,在func中返回f(迭代器)文件“/databricks/spark/python/pyspark/rdd.py”,第1008行,返回self.mapPartitions(lambda i:[sum(1 for _ in i)]) . sum()文件“/databricks/spark/python/pyspark/rdd.py”,第1008行,返回self.mapPartitions(lambda i:[sum(1 for _ in i)]) . sum()文件“”,第3行,在IndexError:列表索引超出org.apache.spark.api.py.PythonRunner $$的org.apache.spark.api.python.PythonRunner $$ anon $ 1.read(PythonRDD.scala:193) anon $ 1.(PythonRDD.scala:234)org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala: 63)org.apache.spark.rdd.RDd.compartOrReadCheckpoint(RDD.scala:319)org.apache.spark.rdd.RDD.iterator(RDD.scala:283)at org.apache.spark.scheduler.ResultTask .runTask(ResultTask.scala:70)a t org.apache.spark.scheduler.Task.run(Task.scala:86)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:314)at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)... 1更多

2 回答

  • 1

    很可能你有一些分裂后没有4个元素的行 . 当您有空行或类似问题时,通常会发生这种情况 . 你可以做以下两件事之一:

    1. 在 Map 中用这样的默认值替换这种情况:

    educationrdd =adultsrdd.map(lambda row: row.split(',')[3] if (row is not None and len(row.split(','))>3) else None)
    

    2. 使用flatmap只包含相关数据:

    educationrdd =adultsrdd.flatMap(lambda row: [row.split(',')[3]] if (row is not None and len(row.split(','))>3) else [])
    

    当然你可能想用一个不会将行拆分两次的函数替换lambda函数...

  • 0

    我有类似的问题,我试过类似的东西:

    numPartitions =一个数字,例如10或100 adultsrdd = sc.textFile("dbfs:/databricks-datasets/adult/adult.data",numPartitions)灵感来自:如何在Spark中均匀重新分配?或者在这里:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html

相关问题