我有一个Spark Streaming(DStreams)应用程序,从Kinesis读取,解析事件,重复删除它们并以实木复合地板格式存储在Hive表中 . StreamingContext是可恢复的 - 使用StreamingContext.getOrCreate函数创建上下文 .
使用mapWithState函数实现的重复数据删除,它应该从目标表接收带有uniqID的initialState . 但是如果我在创建StreamingContext的函数内部读取了一个初始RDD,那么应用程序在重新启动后会失败“这个RDD缺少一个SparkContext” .
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(conf, streamBatchInterval)
val mapFromRDD = ssc.sparkContext.textFile("path").collect.toMap
/* init Kinesis Stream */
val unionStreams = ssc.union(kinesisStreams)
val transformedData = transformKVToJson(unionStreams)
val mappingDuplicatesFunc: (String, Option[String], State[Boolean]) => (String, String) = (uniqID: String, line: Option[String], state: State[Boolean]) => {
val isDup = state.exists()
if (!isDup) {
state.update(false)
(urid,line.getOrElse().toString)
} else {
("None","None")
}}
/* val initialDupsRDD = spark.table(targetTableName).where("condition").select("uniqID").rdd.map(x => (x.getString(0),true)) */
val dataWithState = transformedData.mapWithState(StateSpec.function(mappingDuplicatesFunc).timeout(Minutes(60))/*.initialState(initialDupsRDD)*/)
dataWithState.filter(out => !out._1.equals("None")).foreachRDD{rdd =>
if (!rdd.isEmpty()) {
/* some business logic, sucessfully uses mapFromRDD variable */
dataFrame.repartition(1).write.mode("append").parquet("pathToTable")
}
}
dataWithState.checkpoint(streamBatchInterval)
ssc.checkpoint(checkpointDirectory)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createStreamingContext())
ssc.start
ssc.awaitTermination
它有点类似于这个问题 - Spark Streaming Job is not recoverable
但是我的问题 - 是否有一种合法的方式从sparkContext而不是sparkSession读取镶木地板的初始状态?或者在其他一些类/函数/等中 . 然后导入流式上下文?