这是一个简单的函数,我写了一个Akka“问”与定时重试 . 有一个明显的竞争条件,我不知道如何解决 .
def askWithRetry(actor: ActorRef, message: Any, timeout: Timeout): Future[Any] =
(actor ? message)(timeout) recoverWith { case e: AskTimeoutException =>
// do a retry. currently there is no retry limit for simplicity.
askWithRetry(actor, message, timeout)
}
通常,这是有效的 . "ask"或 ?
为每个调用创建一个临时中间actor . 如果目标发送响应消息,则临时"ask actor"将结果作为成功完成放入Future中 . 如果目标没有及时响应,则将来会以超时异常完成,并且recoverWith进程会重试 .
但是,存在竞争条件 . 如果目标将响应消息发送给临时“ask actor”,但在响应消息之前处理超时,则响应消息将丢失 . 重试过程使用新的临时actor重新发送新请求 . 由于响应消息被发送到先前临时的“ask actor”,现在已经不存在,因此它将不会被处理并丢失 .
我怎样才能解决这个问题?
我可以使用内置的重试逻辑编写一个自定义版本的Ask模式来修复这种竞争条件......如果有更多的标准选项,我讨厌使用不必要的自定义代码 .
UPDATE :这是我最终使用的自定义版本:
object AskWithRetry {
def askWithRetry(context: ActorContext, actor: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Future[Any] = {
val p = Promise[Any]
val intermediate = context.actorOf(props(p, actor, message, retryInterval, maxRetries))
p.future
}
def props(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, maxRetries: Option[Int]): Props =
Props(new AskWithRetryIntermediateActor(promise, target, message, retryInterval, maxRetries))
}
class AskWithRetryIntermediateActor(promise: Promise[Any], target: ActorRef, message: Any, retryInterval: Duration, var maxRetries: Option[Int]) extends Actor {
def doSend(): Unit = target ! message
def receive: Receive = {
case ReceiveTimeout =>
maxRetries match {
case None =>
//println(s"Retrying. Infinite tries left. ${message}")
doSend()
case Some(retryCount) =>
if (retryCount > 0) {
//println(s"Retrying. ${retryCount-1} tries left. ${message}")
maxRetries = Some(retryCount - 1)
doSend()
} else {
//println(s"Exceeded timeout limit. Failing. ${message}")
if (!promise.isCompleted) {
promise.failure(new AskTimeoutException("retry limit reached"))
}
context.stop(self)
}
}
case otherMessage: Any =>
if (!promise.isCompleted) {
//println(s"AskWithRetry: completing ${otherMessage}")
promise.success(otherMessage)
}
context.stop(self)
}
context.setReceiveTimeout(retryInterval)
doSend()
}
1 回答
我觉得你的直觉很好 . 如果你想要自定义actor-logic,你应该写它 .
自定义请求等待的actor应该将消息发送到
actor
和scheduleOnce
消息给自己重试 . 这样,响应和超时都通过receive
方法到达,并且您没有任何比赛 .