首页 文章

PySpark RuntimeError:在迭代期间设置更改的大小

提问于
浏览
2

我正在运行一个pyspark脚本,并在下面遇到错误 . 由于我的代码“如果len(rdd.take(1))> 0:”,似乎说“RuntimeError:在迭代期间设置更改的大小” . 我不确定这是不是真正的原因,并想知道到底出了什么问题 . 任何帮助将不胜感激 .

谢谢!

17/03/23 21:54:17 INFO DStreamGraph:更新了检查点数据的时间1490320070000 ms
17/03/23 21:54:17 INFO JobScheduler:完成作业流作业1490320072000 ms.0来自作业时间1490320072000 ms
17/03/23 21:54:17 INFO JobScheduler:从作业时间1490320072000 ms开始作业流作业1490320072000 ms.1
17/03/23 21:54:17错误JobScheduler:运行作业流作业时出错1490320072000 ms.0
org.apache.spark.SparkException:Python引发了一个异常:
Traceback(最近一次调用最后一次):
文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py”,
第65行,在调用r = self.func(t,* rdds)文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py”,第159行,在func = lambda t ,rdd:old_func(rdd)文件“/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py”,第96行,在_compute_glb_max中如果len(rdd.take(1))> 0:文件“/ usr / lib / spark / python / lib / pyspark.zip / pyspark / rdd.py“,第1343行,取res = self.context.runJob(self,takeUpToNumLeft,p)文件”/ usr / lib / spark / python / lib /pyspark.zip/pyspark/context.py“,第965行,在runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd,partitions)文件”/ usr / lib / spark / python / lib / pyspark.zip / pyspark / rdd.py“,第2439行,在_jrdd self._jrdd_deserializer,profiler中)文件”/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py“,第2372行,在_wrap_function pickled_command,broadcast_vars,env,includes = _prepare_for_python_RDD(sc,command)文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py”,第2363行,在_prepare_for_python_RDD broadcast_中vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError:在org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)的迭代期间设置更改的大小
在org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
在org.apache.spark.streaming.api.python.PythonDStream $$ anonfun $ callForeachRDD $ 1.apply(PythonDStream.scala:179)
在org.apache.spark.streaming.api.python.PythonDStream $$ anonfun $ callForeachRDD $ 1.apply(PythonDStream.scala:179)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp(ForEachDStream.scala:51)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ForEachDStream.scala:51)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ForEachDStream.scala:51)
在org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply $ mcV $ sp(ForEachDStream.scala:50)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply(ForEachDStream.scala:50)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply(ForEachDStream.scala:50)
在scala.util.Try $ .apply(Try.scala:192)
在org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
在org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply $ mcV $ sp(JobScheduler.scala:254)
在org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply(JobScheduler.scala:254)
在org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply(JobScheduler.scala:254)
在scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
在org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler.run(JobScheduler.scala:253)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)
Traceback(最近一次调用最后一次):
文件“/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py”,
第224行,在ssc.awaitTermination()中;文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/context.py”,第206行,在awaitTermination文件中“/usr/lib/spark/python/lib/py4j-0.10.4- src.zip/py4j/java_gateway.py“,第1133行,在调用文件”/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py“,第63行,在deco文件中“/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py”,第319行,在get_return_value中py4j.protocol.Py4JJavaError:调用o38.awaitTermination时发生错误 . :org.apache.spark.SparkException:Python引发了一个异常:Traceback(最近一次调用最后一次):文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py”,第65行,在调用r = self.func(t,* rdds)文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py”,第159行,在func = lambda t ,rdd:old_func(rdd)文件“/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py”,第96行,在_compute_glb_max中如果len(rdd.take(1))> 0:文件“/ usr / lib / spark / python / lib / pyspark.zip / pyspark / rdd.py“,第1343行,取res = self.context.runJob(self,takeUpToNumLeft,p)文件”/ usr / lib / spark / python / lib /pyspark.zip/pyspark/context.py“,第965行,在runJob port = self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd,partitions)文件”/ usr / lib / spark / python / lib / pyspark.zip / pyspark / rdd.py“,第2439行,在_jrdd self._jrdd_deserializer,profiler中)文件”/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py“,第2372行,在_wrap_function pickled_command中,b roadcast_vars,env,includes = _prepare_for_python_RDD(sc,command)文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py”,第2363行,在_prepare_for_python_RDD中broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] RuntimeError:在org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)的迭代期间设置更改的大小
在org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
在org.apache.spark.streaming.api.python.PythonDStream $$ anonfun $ callForeachRDD $ 1.apply(PythonDStream.scala:179)
在org.apache.spark.streaming.api.python.PythonDStream $$ anonfun $ callForeachRDD $ 1.apply(PythonDStream.scala:179)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp(ForEachDStream.scala:51)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ForEachDStream.scala:51)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ForEachDStream.scala:51)
在org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply $ mcV $ sp(ForEachDStream.scala:50)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply(ForEachDStream.scala:50)
在org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply(ForEachDStream.scala:50)
在scala.util.Try $ .apply(Try.scala:192)
在org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
在org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply $ mcV $ sp(JobScheduler.scala:254)
在org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply(JobScheduler.scala:254)
在org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply(JobScheduler.scala:254)
在scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
在org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler.run(JobScheduler.scala:253)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)

2 回答

  • 1

    似乎不是在迭代中创建广播变量的最佳实践 . 如果需要有状态数据,请始终使用updateStateByKey .

  • 2

    尝试

    if rdd.count() <1 :
    

    take()可以提供异常,但是,如果有更多详细信息,我们可以确定错误 .

相关问题