我有一个Spark Streaming(DStreams)应用程序,从Kinesis读取,解析事件,重复删除它们并以实木复合地板格式存储在Hive表中 . StreamingContext是可恢复的 - 使用StreamingContext.getOrCreate函数创建上下文 .

使用mapWithState函数实现的重复数据删除,它应该从目标表接收带有uniqID的initialState . 但是如果我在创建StreamingContext的函数内部读取了一个初始RDD,那么应用程序在重新启动后会失败“这个RDD缺少一个SparkContext” .

def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(conf, streamBatchInterval)

    val mapFromRDD = ssc.sparkContext.textFile("path").collect.toMap

    /* init Kinesis Stream */
    val unionStreams = ssc.union(kinesisStreams)
    val transformedData = transformKVToJson(unionStreams)

    val mappingDuplicatesFunc: (String, Option[String], State[Boolean]) => (String, String) = (uniqID: String, line: Option[String], state: State[Boolean]) => {
      val isDup = state.exists()
      if (!isDup) {
        state.update(false)
        (urid,line.getOrElse().toString)
      } else {
        ("None","None")
      }}

    /* val initialDupsRDD = spark.table(targetTableName).where("condition").select("uniqID").rdd.map(x => (x.getString(0),true)) */

    val dataWithState = transformedData.mapWithState(StateSpec.function(mappingDuplicatesFunc).timeout(Minutes(60))/*.initialState(initialDupsRDD)*/)

    dataWithState.filter(out => !out._1.equals("None")).foreachRDD{rdd =>
      if (!rdd.isEmpty()) {
        /* some business logic, sucessfully uses mapFromRDD variable */
        dataFrame.repartition(1).write.mode("append").parquet("pathToTable")
      }
    }
    dataWithState.checkpoint(streamBatchInterval)
    ssc.checkpoint(checkpointDirectory)
    ssc
  }

val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createStreamingContext())
ssc.start
ssc.awaitTermination

它有点类似于这个问题 - Spark Streaming Job is not recoverable

但是我的问题 - 是否有一种合法的方式从sparkContext而不是sparkSession读取镶木地板的初始状态?或者在其他一些类/函数/等中 . 然后导入流式上下文?