请问我有一个问题,当我执行spark-streaming的应用程序时,我在ssc.start()之前使用foreachRDD但是没办法 .

这是错误,

线程“main”中的异常java.lang.IllegalArgumentException:要求失败:没有注册输出操作,因此在org.apache.spark.streaming.DStreamGraph的scala.Predef $ .require(Predef.scala:233)中无需执行任何操作 . 验证(DStreamGraph.scala:161)org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)org.apache.spark.streaming.StreamingContext.liftedTree1 $ 1(StreamingContext.scala:601)at org . 位于zorro.stream.Main.main(Main.scala)的zorro.stream.Main $ .main(Main.scala:125)的apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)

代码:

val sparkConf = new SparkConf().setAppName("Test Spark Streaming applic")                         )    .setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true")
     val ssc = new StreamingContext(sparkConf, Seconds(1))


    // load prediction models
    println(">=================== LOAD MODELS =====================<")
    for (surv <- survs){
      for( id <- equipmentsIds){
        models ++= Map(surv.name+"_"+id -> LinearRegressionModel.load(ssc.sparkContext, modelsPath+surv.name+"_"+id+".ml"))
      }
    }
    var streams = Array[DStream[Item]]()

    // predict and insert
    println(">=================== INSERTIONS =====================<")
    for(stream <- streams){
      stream.foreachRDD(rdd => rdd.foreach(it => mdb.insertPredictionOnEquipement(it)))
    }
    println(">=================== START STREAMING =====================<")

    ssc.start()
    ssc.awaitTermination()
  }