首页 文章

Akka - Sender [null]发送了类型的消息

提问于
浏览
1

我有一条喷雾路线,我宣布我的一个演员 .

val myActor = actorRefFactory.actorSelection("/user/my-actor")

我的路线看起来像:

get {
  path(Segment / Segment) { (poolId, trackId) =>
    respondWithMediaType(MediaTypes.`application/json`) {
      val request = Request(poolId, trackId)

      val f = (myActor ? request)
        .recoverWith {
          case a: AskTimeoutException =>
            Future.failed[StandardRoute](throw new Exception(s"We got a timeout", a))

          case e: Exception => Future.failed[StandardRoute](throw new Exception(s"We got an error", e))
        }

      onComplete(f) {
        case Success(resp) => complete(OK, resp)

        case Failure(e) =>
          log.error(s"Fatal request error: $trackId / $poolId", e)
          complete(InternalServerError, ErrorCodes.ErrorNotHandled)
      }
    }
  }
}

有时,我可以看到当我同时收到大量请求时,其中一些可能会因以下消息而失败:

引起:akka.pattern.AskTimeoutException:在[8000 ms]之后询问[ActorSelection [Anchor(akka:// default /),Path(/ user / my-actor)]]的超时时间 . Sender [null]发送了类型为“my.company.messages.Request”的消息 .

问题是,如果我采取相同的请求,并尝试再次发送它,它的工作原理,有时只会发生这种情况,我不知道如何解决这个问题 .

演员确实做了很多事情,里面有很多未来,直到它给喷射路线返回一个值 .

在actor中,我创建了一个名为replyTo的val,以保持发送者的值 .

有关为什么有时我会收到此错误的任何想法?

EDIT

关于我如何管理myActor的一个例子:

class MyActor extends Actor with ActorLogging {

  private implicit val timeout = Timeout(8.seconds)

  def receive = {

    case req: Request =>

      val replyTo = sender()

      doOneThing.map { one =>

        doSecondThing(one).map { sec =>
          replyTo ! sec
        }
      }
  }
}

doOneThing和doSecondThing在哪里是期货......我有很多在不同情况下围绕这个演员传播 .

3 回答

  • 0

    您看到的Sender [null]是正常行为 . ask方法 ? 采用隐式参数 sender ,默认值为 ActorRef.noSender . 通常情况下,如果你在 Actor 内,你在 self 范围内有一个隐含的 ActorRef ,但由于你不在 Actor ,它只是采用默认值 .

    您错误的原因可能是收到您的消息的 Actor 没有及时响应 .

  • 3

    我认为你没有在这里使用路由,这就是你得到超时异常的原因 . 例如,假设您的一个请求需要1秒才能执行 . 并且假设您一次收到4个请求,当请求到达一个actor时,它会附加到一个队列中,并且对于您等待8秒的每个请求 . 因此在队列中假设第4个请求将在4秒内执行,因此您将在8秒内获得响应 . 但是当一次发出100个请求并且你的一个actor可以处理它们时,并且对于第100个请求,它将花费100秒来执行但是你只等待第8个请求,这就是你获得超时的原因例外 .

    所以解决方案是,你可以在这里使用路由例如 -

    system.actorOf(RoundRobinPool(concurrency).props(Props(new MyActor())))

    您可以在系统可用进程上设置并发性 . 假设如此设置并发值100,现在对于第100个请求,执行只需2秒 . 所以我认为路由可以是一个解决方案 .

    如果您不知道有多少请求可能是1k或更多,那么您可以动态创建actor,您可以根据请求动态创建actor . 所以无论什么时候发出请求,你的一个独立的演员都会在那里为它提供服务 .

  • 0

    当我没有正确地将回复发送回发件人时,我会收到这种错误 .

    不正确(只返回值):

    def receive = {
      case DataFetch =>
        data
    }
    

    正确(将值发送到 sender ):

    def receive = {
        case DataFetch =>
          sender ! data
      }
    

相关问题