我想从Kafka主题中读取消息并发送到Web套接字客户端 . 我使用akka-stream-kafka创建了一个示例Web套接字服务器应用程序 .
我正在使用kafka-console-producer脚本向kafka主题发送消息 . 工作正常,浏览器客户端通过Web套接字从kafka主题接收数据 .
如果我在30秒内没有向主题发送消息,服务器会给出以下消息:
[KafkaServiceActor-akka.actor.default-dispatcher-2] [akka:// KafkaServiceActor / system / kafka-consumer-2]消息[akka.kafka.KafkaConsumerActor $ Internal $ Stop $]没有发送者到Actor [akka:// KafkaServiceActor / system / kafka-consumer-2#-1039823616]未送达 . [1]遇到死信 . 可以使用配置设置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'关闭或调整此日志记录 .
在此消息之后,Web套接字客户端不会从Kafka主题获取任何新消息 . 我的客户需要重新连接才能开始从主题获取消息 .
感谢任何反馈 .
val incoming = Sink.foreach(println)
def kafkaOut(clientId: String) =
Consumer.plainSource(consumerSettings.withGroupId(clientId), Subscriptions.topics(KAFKA_TOPIC)).map {
r =>
val value = r.value()
TextMessage(value)
}
val requestHandler: HttpRequest => HttpResponse = {
case req @ HttpRequest(GET, Uri.Path("/stream"), _, _, _) =>
req.header[UpgradeToWebSocket] match {
case Some(upgrade) => {
val clientId = "akka-kafka-client-" + clientIdGen.nextInt(1000)
upgrade.handleMessagesWithSinkSource(incoming, kafkaOut(clientId))
}
case None => HttpResponse(400, entity = "Not a valid websocket request")
}
case req: HttpRequest =>
req.discardEntityBytes() //drain the incoming http entity stream
HttpResponse(400, entity = "Unknown service")
}
private def startServer(): Unit = {
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = HOST, PORT)
bindingFuture =
serverSource.to(Sink.foreach { connection =>
connection.handleWithSyncHandler(requestHandler)
}).run()
log.info(s"Server online at http://localhost:9000/")
}
//I do not get any error with following changes
def kafkaOut(clientId: String) = {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
Consumer.plainSource(consumerSettings.withGroupId(clientId), Subscriptions.topics(KAFKA_TOPIC))
.runForeach(t => queue.offer(t.value()))
}
queueSource.map(x => TextMessage(x))
}
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}