我在我的服务中创建了一个lagom主题 . def addingsTopic(): Topic[ProcessDefinitionAdded] 描述符:

override def descriptor: Descriptor = {
    import Service._
    named(ProductionService.TOPIC_NAME).withCalls(
      pathCall(pathProcessDef(":id"), getProcessDefinition _)
    ).withTopics(
      topic("processdef", addingsTopic _ )
        .addProperty(
          KafkaProperties.partitionKeyStrategy,
          PartitionKeyStrategy[ProcessDefinitionAdded](_.entityId.toString)
        ),
      topic("productionserviceorder", ordersTopic _)
        .addProperty(
          KafkaProperties.partitionKeyStrategy,
          PartitionKeyStrategy[OrderAddedTopic](_.entityId.toString)
        )
    ).withAutoAcl(true)
  }

直到这里一切都很清楚 . 现在我有一个播放网页 . 我在我的控制器中有一个Websocket .

val k = productionService.addingsTopic().subscribe.atMostOnceSource.mapAsync(1){e ⇒
    productionService.getProcessDefinition(e.entityId).invoke().map{e ⇒
      Json.toJson(e)(ProcessDefinitionResponse.format)
    }
  }.toMat(BroadcastHub.sink(256) )(Keep.right).run().recover{
    case e ⇒ println(e)
      Json.toJson("error "+e)
  }
  val f = Flow.fromSinkAndSource(Sink.ignore,k.concat(Source.maybe))

  def fetchProcessDefinition() = WebSocket.accept[String,JsValue] { req ⇒
    f
  }

我的javascript网站使用此连接:

<script>
        try{
            console.log("START");
            var socket = new WebSocket("ws://localhost:9000/processdef/stream");

            socket.onopen = function(s){
                console.log("OPEN");
                console.log(s);
            };

            socket.onerror = function(error){
                console.log("ERROR: ");
                console.log(error);
            };

            socket.onmessage = function(msg){
                console.log("Message:");
                console.log(JSON.stringify(msg.data));
            };

            socket.onclose = function(closeevent){
                console.log("CLOSE:");
                console.log(closeevent);
            };

            socket.send("")
        }catch (e) {
            console.error(e);
        }
    </script>

我在sbt runAll之后在localhost上加载页面 . 这需要一段时间才能连接(有时是几分钟,有时是几秒钟) . 大约90秒后,控制台打印出连接已关闭 . 在IDE上我收到此错误:

2018-10-18T10:54:16.655Z [error] akka.actor.ActorSystemImpl [sourceThread=application-akka.actor.default-dispatcher-213, akkaSource=akka.actor.ActorSystemImpl(application), sourceActorSystem=application, akkaTimestamp=10:54:16.654UTC] - Websocket handler failed with The connection closed with error: The connection was reset by peer
akka.stream.StreamTcpException: The connection closed with error: The connection was reset by peer
2018-10-18T10:54:16.674Z [error] akka.actor.ActorSystemImpl [sourceThread=application-akka.actor.default-dispatcher-231, akkaSource=akka.actor.ActorSystemImpl(application), sourceActorSystem=application, akkaTimestamp=10:54:16.674UTC] - Websocket handler failed with The connection closed with error: The connection was reset by peer
akka.stream.StreamTcpException: The connection closed with error: The connection was reset by peer

这是非常紧张的 . 我的代码中有任何错误吗?

谢谢你的帮助 .