首页 文章

在发生故障时解决Akka期货问题

提问于
浏览
9

我在Spray应用程序中使用ask模式调用Actor,并将结果作为HTTP响应返回 . 我将演员的失败映射到自定义错误代码 .

val authActor = context.actorOf(Props[AuthenticationActor])

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user =>
  complete(StatusCodes.OK, user)
}

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = {
 onComplete(f) {
  case Success(value: T) => cb(value)
  case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage)
  case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.")
  //In reality this returns a custom error object.
 }
}

当authActor发送失败时,这可以正常工作,但如果authActor抛出异常,则在询问超时完成之前不会发生任何事情 . 例如:

override def receive: Receive = {
  case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token")
}

我知道Akka的文档说的是

要通过例外完成未来,您需要向发件人发送失败消息 . 当actor在处理消息时抛出异常时,不会自动执行此操作 .

但鉴于我在Spray路由演员和服务演员之间使用了很多接口,我宁愿不用try / catch包装每个子actor的接收部分 . 是否有更好的方法来实现子actor中异常的自动处理,并在发生异常时立即解决未来问题?

编辑:这是我目前的解决方案 . 然而,为每个儿童演员做这件事都很麻烦 .

override def receive: Receive = {
case default =>
  try {
    default match {
      case _ => throw new ServiceException("")//Actual code would go here
    }
  }
  catch {
    case se: ServiceException =>
      logger.error("Service error raised:", se)
      sender ! Failure(se)
    case ex: Exception =>
      sender ! Failure(ex)
      throw ex
  }
}

这样,如果它是预期的错误(即ServiceException),则通过创建失败来处理 . 如果它是意外的,它会立即返回失败,以便解决未来,但随后抛出异常,以便SupervisorStrategy仍然可以处理它 .

1 回答

  • 16

    如果您想要一种方法在发生意外异常时自动将响应发送回发件人,那么这样的事情可能对您有用:

    trait FailurePropatingActor extends Actor{
      override def preRestart(reason:Throwable, message:Option[Any]){
        super.preRestart(reason, message)
        sender() ! Status.Failure(reason)
      }
    }
    

    我们覆盖 preRestart 并将故障传播回发送方 Status.Failure ,这将导致上游 Future 失败 . 此外,在这里调用 super.preRestart 非常重要,因为这是儿童停止发生的地方 . 在演员中使用它看起来像这样:

    case class GetElement(list:List[Int], index:Int)
    class MySimpleActor extends FailurePropatingActor {  
      def receive = {
        case GetElement(list, i) =>
          val result = list(i)
          sender() ! result
      }  
    }
    

    如果我是这样调用这个actor的实例:

    import akka.pattern.ask
    import concurrent.duration._
    
    val system = ActorSystem("test")
    import system.dispatcher
    implicit val timeout = Timeout(2 seconds)
    val ref = system.actorOf(Props[MySimpleActor])
    val fut = ref ? GetElement(List(1,2,3), 6)
    
    fut onComplete{
      case util.Success(result) => 
        println(s"success: $result")
    
      case util.Failure(ex) => 
        println(s"FAIL: ${ex.getMessage}")
        ex.printStackTrace()    
    }
    

    然后它会正确地击中我的 Failure 块 . 现在,当 Futures 没有涉及扩展该特征的actor时,该基本特征中的代码运行良好,就像这里的简单actor一样 . 但是如果你使用 Future 然后你需要小心,因为 Future 中发生的异常不会导致actor中的重启,而且,在 preRestart 中,对 sender() 的调用将不会返回正确的引用,因为该actor已经进入下一条消息 . 像这样的演员表明了这个问题:

    class MyBadFutureUsingActor extends FailurePropatingActor{
      import context.dispatcher
    
      def receive = {
        case GetElement(list, i) => 
          val orig = sender()
          val fut = Future{
            val result = list(i)
            orig ! result
          }      
      } 
    }
    

    如果我们在之前的测试代码中使用此actor,我们将始终在失败情况下获得超时 . 为了缓解这种情况,您需要将期货结果反馈给发件人,如下所示:

    class MyGoodFutureUsingActor extends FailurePropatingActor{
      import context.dispatcher
      import akka.pattern.pipe
    
      def receive = {
        case GetElement(list, i) => 
          val fut = Future{
            list(i)
          }
    
          fut pipeTo sender()
      } 
    }
    

    在这种特殊情况下,actor本身不会重新启动,因为它没有遇到未捕获的异常 . 现在,如果你的演员需要在未来之后进行一些额外的处理,那么当你得到一个 Status.Failure 时,你可以回到自己并显式地失败:

    class MyGoodFutureUsingActor extends FailurePropatingActor{
      import context.dispatcher
      import akka.pattern.pipe
    
      def receive = {
        case GetElement(list, i) => 
          val fut = Future{
            list(i)
          }
    
          fut.to(self, sender())
    
        case d:Double =>
          sender() ! d * 2
    
        case Status.Failure(ex) =>
          throw ex
      } 
    }
    

    如果这种行为变得普遍,你可以将它提供给任何需要它的演员:

    trait StatusFailureHandling{ me:Actor =>
      def failureHandling:Receive = {
        case Status.Failure(ex) =>
          throw ex      
      }
    }
    
    class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{
      import context.dispatcher
      import akka.pattern.pipe
    
      def receive = myReceive orElse failureHandling
    
      def myReceive:Receive = {
        case GetElement(list, i) => 
          val fut = Future{
            list(i)
          }
    
          fut.to(self, sender())
    
        case d:Double =>
          sender() ! d * 2        
      } 
    }
    

相关问题