请问我有一个问题,当我执行spark-streaming的应用程序时,我在ssc.start()之前使用foreachRDD但是没办法 .
这是错误,
线程“main”中的异常java.lang.IllegalArgumentException:要求失败:没有注册输出操作,因此在org.apache.spark.streaming.DStreamGraph的scala.Predef $ .require(Predef.scala:233)中无需执行任何操作 . 验证(DStreamGraph.scala:161)org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:601)at org . 位于zorro.stream.Main.main(Main.scala)的zorro.stream.Main $ .main(Main.scala:125)的apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
代码:
val sparkConf = new SparkConf().setAppName("Test Spark Streaming applic") ) .setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// load prediction models
println(">=================== LOAD MODELS =====================<")
for (surv <- survs){
for( id <- equipmentsIds){
models ++= Map(surv.name+"_"+id -> LinearRegressionModel.load(ssc.sparkContext, modelsPath+surv.name+"_"+id+".ml"))
}
}
var streams = Array[DStream[Item]]()
// predict and insert
println(">=================== INSERTIONS =====================<")
for(stream <- streams){
stream.foreachRDD(rdd => rdd.foreach(it => mdb.insertPredictionOnEquipement(it)))
}
println(">=================== START STREAMING =====================<")
ssc.start()
ssc.awaitTermination()
}