首页 文章

如何管理一套Akka期货

提问于
浏览
1

我有一套未来[T],我想管理到我正在编写的库的单个对象 . 在我目前的实现中,我使用Future.sequence来收集它们并等到它们已经解决,所以我可以在它们上做未来的事情(map,collect,filter) . 然而,这只能让我有能力匹配成功或失败,这不一定是我正在处理的期货收集的情况 . 有些会失败,有些会失败,我希望能够从成功的那些中提取我可以获得的值,并收集其他的异常和错误,以便我可以适当地升级它们 . 在伪代码中它会是这样的

Future.sequence(Set[Future[T]]) andThen {
  case FullSuccess => "woot"
  case SomeErrors  => "well, that's still ok."
  case FullErrors  => "Ok, who's the wise guy."
}

我真正想要的是拥有数据的数据,如果序列中只有1个期货失败,则不必返回完全失败 .

谢谢您的帮助 .

1 回答

  • 1

    不幸的是,你的案例没有内置帮助器,但是很容易创建你自己的:

    import scala.concurrent.{Await, Future}
    import scala.util.{Failure, Success, Try}
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration.DurationInt
    
    def sequenceOfTries[T](futures: Seq[Future[T]]): Future[Seq[Try[T]]] =
      futures.foldLeft(Future.successful(List[Try[T]]())) {
        case (accF, f) => accF.flatMap {
          acc => f.map(v => Success(v) :: acc).recover { case ex => Failure(ex) :: acc }
        }
      }.map(_.reverse)
    
    
    val v = Seq(
      Future.successful(1),
      Future.failed(new IllegalStateException("2")),
      Future.successful(3),
      Future.failed(new IllegalStateException("4"))
    )
    
    Await.result(sequenceOfTries(v), 1.second)
    

    结果:

    v: Seq[scala.concurrent.Future[Int]] = List(scala.concurrent.impl.Promise$KeptPromise@2416f7e5, scala.concurrent.impl.Promise$KeptPromise@2aaf675d, scala.concurrent.impl.Promise$KeptPromise@360d48f, scala.concurrent.impl.Promise$KeptPromise@230f8be2)    
    
    res0: Seq[scala.util.Try[Int]] = List(Success(1), Failure(java.lang.IllegalStateException: 2), Success(3), Failure(java.lang.IllegalStateException: 4))
    

    UPD . 或者你可以像这样利用 Future.sequence (结果相同):

    def sequenceOfTries[T](futures: Seq[Future[T]]): Future[Seq[Try[T]]] =
      Future.sequence(futures.map(_.map(x => Success(x)).recover { case ex => Failure(ex) }))
    

相关问题