我正在尝试一些关于spark的实践,我尝试使用spark streaming来读取kafka主题中的数据并将该数据存储在elasticsearch索引中 . 我正在尝试从我的ide运行我的代码 . 我在Kafka中添加了一些消息并运行了我的Kafka Streaming上下文程序,它读取了数据,但之后程序停止了 . 所以,如果我在kafka中添加新数据,我必须再次运行我的流上下文程序 . 我希望流式上下文能够“监听”kafka代理,每次我在Kafka代理中添加一些消息时都不应该运行它 . 这是我的代码:

val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaStreams2")
                    val ssc = new StreamingContext(conf, Seconds(1))
                    val kafkaParams = Map(
                            "bootstrap.servers" -> "localhost:9092",
                            "key.deserializer" -> classOf[LongDeserializer],
                            "value.deserializer" -> classOf[StringDeserializer],
                            "group.id" -> "spark-streaming-notes2",
                            "auto.offset.reset" -> "latest"
                            )
                    // List of topics you want to listen for from Kafka
                    val topics = List("inputstream-sink")
                    val lines = KafkaUtils.createDirectStream[String, String](ssc,
                            PreferConsistent,
                            ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
                            )

                    val word = lines.map(_.value())
                    word.print()
                    insertIntoIndexes()
                    ssc.start()
                    ssc.stop(stopSparkContext = false)