首页 文章

如何处理Akka Actor内部WS调用的超时

提问于
浏览
0

我有以下actor向WebService发送请求:

class VigiaActor extends akka.actor.Actor {
  val log = Logging(context.system, this)

  context.setReceiveTimeout(5 seconds)

  import VigiaActor._
  def receive = {
    case ObraExists(numero: String, unidadeGestora: String) =>
      WS.url(baseURL + s"""/obras/exists/$unidadeGestora/$numero""").withHeaders("Authorization" -> newToken).get.pipeTo(sender)
    case ReceiveTimeout =>
      val e = TimeOutException("VIGIA: Receive timed out")
      throw e
  }

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 minute) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: TimeOutException         => Resume       
      case _: Exception                => Restart
    }
}

对此actor的调用是验证方法的一部分,该方法应在尝试与WS通信时发生超时时抛出异常:

implicit val timeout = Timeout(5 seconds)
lazy val vigiaActor : ActorRef = Akka.system.actorOf(Props[VigiaActor])

(vigiaActor ? VigiaActor.ObraExists(empenho.obra.get, empenho.unidadeGestora)).map {
  case r : WSResponse =>
    val exists = r.body.toBoolean

    if (!exists && empenho.tipoMeta.get.equals(4)) {
      erros.adicionarErro(controle.codigoArquivo, row, line, s"Nº de Obra não informado ou inválido para o Tipo de Meta 4 - Obras" , TipoErroImportacaoEnum.WARNING)
    }

  case _ => erros.adicionarErro(controle.codigoArquivo, row, line, s"Nº de Obra não informado ou inválido para o Tipo de Meta 4 - Obras" , TipoErroImportacaoEnum.WARNING)
}

我是这个Actor的新手,我正在尝试解决代码上的一些阻塞情况 .

问题是我不知道如何在演员的电话中使用"catch" .

UPDATE

切换验证方法:

protected def validateRow(row: Int, line: String, empenho: Empenho, calendarDataEnvioArquivo: Calendar)(implicit s: Session, controle: ControleArquivo, erros:ImportacaoException): Unit = {
    implicit val timeout = Timeout(5 seconds)
    lazy val vigiaActor : ActorRef = Akka.system.actorOf(Props[VigiaActor])

    (vigiaActor ? VigiaActor.ObraExists(empenho.obra.get, empenho.unidadeGestora)).map {
      case e: TimeOutException => println("TIMOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOUT!!!!")
      case r: WSResponse => {...}
    }
}

和演员ReceiveTimout部分:

case ReceiveTimeout =>
  val e = TimeOutException("VIGIA: Receive timed out")
  sender ! e

我像以前一样得到以下日志消息:

[INFO] [07/20/2017 10:28:05.738] [application-akka.actor.default-dispatcher-5] [akka:// application / deadLetters]来自Actor [akka]的消息[model.exception.TimeOutException] :// application / user / $ c#1834419855]对于演员[akka:// application / deadLetters]未送达 . [1]遇到死信 . 可以使用配置设置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'关闭或调整此日志记录 .

2 回答

  • 0

    context.setReceiveTimeout(5 seconds) 触发向 VigiaActor 发送 ReceiveTimeout 消息,如果该演员没有收到消息五秒钟 . Akka在内部将 ReceiveTimeout 发送给您的演员,这就是为什么在您更新的代码中,尝试将异常发送到 sender 并没有达到预期效果 . 换句话说, case ReceiveTimeout => 子句中的 sender 不是 ObraExists 消息的原始发件人 .

    VigiaActor 中设置接收超时与 WS 请求超时无关,因为如果请求超时,则不会向 VigiaActor 发送消息 . 即使在5秒内未完成 WS 请求时向该actor发送了一条消息,同时另一条消息可能已在该actor的邮箱中排队,因此无法触发 ReceiveTimeout .

    简而言之,设置actor的接收超时不是处理 WS 请求超时的正确机制 . (使用当前的方法将 get 请求的结果传递给发送方,您可以调整发送方以处理超时 . 事实上,我完全放弃 VigiaActor 并直接在 validateRow 方法中调用 WS ,但是摆脱演员可能不是你的问题 . )

    如果您必须在actor中处理 WS 请求超时,则执行此操作的一种方法如下所示:

    import scala.util.{Failure, Success}
    
    class VigiaActor extends akka.actor.Actor {
      import VigiaActor._
      val log = Logging(context.system, this)
    
      def receive = {
        case ObraExists(numero: String, unidadeGestora: String) =>
          val s = sender // capture the original sender
          WS.url(baseURL + s"""/obras/exists/$unidadeGestora/$numero""")
            .withHeaders("Authorization" -> newToken)
            .withRequestTimeout(5 seconds) // set the timeout
            .get
            .onComplete {
              case Success(resp) =>
                s ! resp
              case Failure(e: scala.concurrent.TimeoutException) =>
                s ! TimeOutException("VIGIA: Receive timed out")
              case Failure(_) =>
                // do something in the case of non-timeout failures
            }
      }
    }
    
  • 1

    我认为你过分诠释了这种心态 . 在特殊情况下,您只在Actors中抛出异常 . 也就是说,如果出现意外崩溃的情况,你可以构建你的Actors来应对 . 但如果它是正常和合理预期的东西,你就像对待任何其他代码路径一样对待它 .

    所以在你的情况下,它与throw或catch无关 - 在你的 ReceiveTimeout 子句中,只是将消息发送回原始发件人,说请求因超时而失败,并让发件人处理它但是他们考虑适当 . 它最终与您的成功案例相似 .

相关问题