首页 文章

Akka Ask with Timed Retry

提问于
浏览
3

这是一个简单的函数,我写了一个Akka“问”与定时重试 . 有一个明显的竞争条件,我不知道如何解决 .

def askWithRetry(actor: ActorRef, message: Any, timeout: Timeout): Future[Any] =
  (actor ? message)(timeout) recoverWith { case e: AskTimeoutException =>
      // do a retry. currently there is no retry limit for simplicity.
      askWithRetry(actor, message, timeout)
    }

通常,这是有效的 . "ask"或 ? 为每个调用创建一个临时中间actor . 如果目标发送响应消息,则临时"ask actor"将结果作为成功完成放入Future中 . 如果目标没有及时响应,则将来会以超时异常完成,并且recoverWith进程会重试 .

但是,存在竞争条件 . 如果目标将响应消息发送给临时“ask actor”,但在响应消息之前处理超时,则响应消息将丢失 . 重试过程使用新的临时actor重新发送新请求 . 由于响应消息被发送到先前临时的“ask actor”,现在已经不存在,因此它将不会被处理并丢失 .

我怎样才能解决这个问题?

我可以使用内置的重试逻辑编写一个自定义版本的Ask模式来修复这种竞争条件......如果有更多的标准选项,我讨厌使用不必要的自定义代码 .

UPDATE :这是我最终使用的自定义版本:

object AskWithRetry {
  def askWithRetry(context: ActorContext, actor: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Future[Any] = {
    val p = Promise[Any]

    val intermediate = context.actorOf(props(p, actor, message, retryInterval, maxRetries))

    p.future
  }

  def props(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Props =
          Props(new AskWithRetryIntermediateActor(promise, target, message, retryInterval, maxRetries))
}

class AskWithRetryIntermediateActor(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, var maxRetries: Option[Int]) extends Actor {
  def doSend(): Unit = target ! message

  def receive: Receive = {
    case ReceiveTimeout =>
      maxRetries match {
        case None =>
          //println(s"Retrying. Infinite tries left. ${message}")
          doSend()
        case Some(retryCount) =>
          if (retryCount > 0) {
            //println(s"Retrying. ${retryCount-1} tries left. ${message}")
            maxRetries = Some(retryCount - 1)
            doSend()
          } else {
            //println(s"Exceeded timeout limit. Failing. ${message}")
            if (!promise.isCompleted) {
              promise.failure(new AskTimeoutException("retry limit reached"))
            }
            context.stop(self)
          }
      }
    case otherMessage: Any =>
      if (!promise.isCompleted) {
        //println(s"AskWithRetry: completing ${otherMessage}")
        promise.success(otherMessage)
      }
      context.stop(self)
  }

  context.setReceiveTimeout(retryInterval)
  doSend()
}

1 回答

  • 3

    我觉得你的直觉很好 . 如果你想要自定义actor-logic,你应该写它 .

    自定义请求等待的actor应该将消息发送到 actorscheduleOnce 消息给自己重试 . 这样,响应和超时都通过 receive 方法到达,并且您没有任何比赛 .

相关问题