我想从Kafka主题中读取消息并发送到Web套接字客户端 . 我使用akka-stream-kafka创建了一个示例Web套接字服务器应用程序 .

我正在使用kafka-console-producer脚本向kafka主题发送消息 . 工作正常,浏览器客户端通过Web套接字从kafka主题接收数据 .

如果我在30秒内没有向主题发送消息,服务器会给出以下消息:

[KafkaServiceActor-akka.actor.default-dispatcher-2] [akka:// KafkaServiceActor / system / kafka-consumer-2]消息[akka.kafka.KafkaConsumerActor $ Internal $ Stop $]没有发送者到Actor [akka:// KafkaServiceActor / system / kafka-consumer-2#-1039823616]未送达 . [1]遇到死信 . 可以使用配置设置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'关闭或调整此日志记录 .

在此消息之后,Web套接字客户端不会从Kafka主题获取任何新消息 . 我的客户需要重新连接才能开始从主题获取消息 .

感谢任何反馈 .

val incoming = Sink.foreach(println)

def kafkaOut(clientId: String) =
  Consumer.plainSource(consumerSettings.withGroupId(clientId), Subscriptions.topics(KAFKA_TOPIC)).map {
    r =>
      val value = r.value()
      TextMessage(value)
  }

val requestHandler: HttpRequest => HttpResponse = {
  case req @ HttpRequest(GET, Uri.Path("/stream"), _, _, _) =>
    req.header[UpgradeToWebSocket] match {
      case Some(upgrade) => {
        val clientId = "akka-kafka-client-" + clientIdGen.nextInt(1000) 
        upgrade.handleMessagesWithSinkSource(incoming, kafkaOut(clientId))
      }
      case None => HttpResponse(400, entity = "Not a valid websocket request")
    }

  case req: HttpRequest =>
    req.discardEntityBytes() //drain the incoming http entity stream
    HttpResponse(400, entity = "Unknown service")
}

private def startServer(): Unit = {
  val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
    Http().bind(interface = HOST, PORT)

  bindingFuture =
    serverSource.to(Sink.foreach { connection =>
      connection.handleWithSyncHandler(requestHandler)
    }).run()

  log.info(s"Server online at http://localhost:9000/")
}
//I do not get any error with following changes 
def kafkaOut(clientId: String) = {

val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

futureQueue.map { queue =>
  Consumer.plainSource(consumerSettings.withGroupId(clientId), Subscriptions.topics(KAFKA_TOPIC))
    .runForeach(t => queue.offer(t.value()))
}

queueSource.map(x => TextMessage(x))
}                                                                                                                
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
  p.trySuccess(m)
  m
}
(s, p.future)
}