首页 文章

从检查点恢复pyspark流

提问于
浏览
0

我使用带启用检查点的pyspark流 . 第一次启动成功,但重启时崩溃并出现错误:

INFO scheduler.DAGScheduler:ResultStage 6(PythonRDD.scala中的runJob:441)由于作业因阶段失败而中止而在1,160秒内失败:阶段6.0中的任务0失败4次,最近失败:阶段6.0中失去任务0.3( TID 86,h -.e-contenta.com,executor 2):org.apache.spark.api.python.PythonException:Traceback(最近一次调用最后一次):文件“/ data1 / yarn / nm / usercache / appcache / application_1481115309392_0229 / container_1481115309392_0229_01_000003 / pyspark.zip / pyspark / worker.py“,第163行,主要功能,分析器,反序列化器,序列化器= read_command(pickleSer,infile)文件”/ data1 / yarn / nm / usercache / appcache / application_1481115309392_0229 / container_1481115309392_0229_01_000003 /pyspark.zip/pyspark/worker.py“,第56行,在read_command命令= serializer.loads(command.value)文件”/data1/yarn/nm/usercache/appcache/application_1481115309392_0229/container_1481115309392_0229_01_000003/pyspark.zip/pyspark/ serializers.py“,第431行,在load中返回pickle.loads(obj,encoding = encoding)ImportE rror:没有命名的模块......

通过spark context addPyFile()添加Python模块

def create_streaming():
"""
Create streaming context and processing functions
:return: StreamingContext
"""
sc = SparkContext(conf=spark_config)
zip_path = zip_lib(PACKAGES, PY_FILES)
sc.addPyFile(zip_path)
ssc = StreamingContext(sc, BATCH_DURATION)

stream = KafkaUtils.createStream(ssc=ssc, zkQuorum=','.join(ZOOKEEPER_QUORUM),
                                      groupId='new_group',
                                      topics={topic: 1})

stream.checkpoint(BATCH_DURATION)
stream = stream \
    .map(lambda x: process(ujson.loads(x[1]), geo_data_bc_value)) \
    .foreachRDD(lambda_log_writer(topic, schema_bc_value))

ssc.checkpoint(STREAM_CHECKPOINT)
return ssc

if __name__ == '__main__':
ssc = StreamingContext.getOrCreate(STREAM_CHECKPOINT, lambda: create_streaming())
ssc.start()
ssc.awaitTermination()

1 回答

  • 0

    对不起,这是我的错 .

    试试这个 :

    if __name__ == '__main__':
        ssc = StreamingContext.getOrCreate('', None)
        ssc.sparkContext.addPyFile()
    
        ssc.start()
        ssc.awaitTermination()
    

相关问题