如果我创建并启动KafkaStream实例,然后在关闭钩子中调用.close(),则不会发生任何事情 - 日志记录表明存在的一个StreamThread已进入PENDING_SHUTDOWN状态,但随后就像这样永远 . 我已经尝试了一切没有运气 - 已经阅读了Kafka流源代码,看看它在关机期间做了什么,但在我看来,代码暗示StreamThread,如果处于运行状态,将永远不会停止(这不能正确 - 我在Kafka JIRA中没有看到任何这种性质的错误 .

这是我简单的KafkaStream(scala)应用程序的相关代码:

val props: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1")
 p
}


implicit val produced = Produced.`with`(new StringSerde(), new StringSerde())

val builder: StreamsBuilder = new StreamsBuilder()
val in: KStream[String, String] = builder.stream[String, String]("input-topic")

in.map((k,v) =>{ 
         println("Consumed and transforming value")
         (k,s"$v_transformed") 
     }).to("output-topic")

val streams: KafkaStreams = new KafkaStreams(builder.build(), props)

streams.start()

sys.addShutdownHook(streams.stop())

正如您所看到的,它只是从一个主题中读取,对消费记录中的值进行微小更改并将其写入另一个主题 .

启动流,应用程序将在向其发送SIGINT时调用stop() .

当I ^ C终止进程时,我看到Kafka日志记录表明StreamThread-1正在转换为PENDING_SHUTDOWN,并且它已经完成了 . 它最终(在几秒钟内)到达状态NOT_RUNNING但它永远不会,并且它继续为它继续从输入主题读取的每个记录输出我的println语句 .

我在这做错了什么?

更新:根据评论者的建议,我尝试使用60秒的超时调用close(),最终得到了这个,但仍然没有关闭: -

[shutdownHook1] INFO o.apache.kafka.streams.KafkaStreams - stream-client [ - ] Streams客户端无法在超时内完全停止