首页 文章

如何等待几个期货

提问于
浏览
78

假设我有几个期货,需要等到其中任何一个失败或全部成功 .

例如:让3个期货: f1f2f3 .

  • 如果 f1 成功且 f2 失败,我不等待 f3 (并将失败返回给客户端) .

  • 如果 f2f1f3 仍在运行时失败,我不等待它们(并返回失败)

  • 如果 f1 成功,然后 f2 成功,我继续等待 f3 .

你会如何实现它?

8 回答

  • 4

    您可以使用for-comprehension,如下所示:

    val fut1 = Future{...}
    val fut2 = Future{...}
    val fut3 = Future{...}
    
    val aggFut = for{
      f1Result <- fut1
      f2Result <- fut2
      f3Result <- fut3
    } yield (f1Result, f2Result, f3Result)
    

    在这个例子中,期货1,2和3并行启动 . 然后,在理解中,我们等到结果1然后是2然后3可用 . 如果1或2失败,我们将不再等待3 . 如果所有3都成功,那么 aggFut val将保持一个带有3个时隙的元组,对应于3个期货的结果 .

    现在如果你需要停止等待的行为,如果说fut2首先失败,事情会变得有点棘手 . 在上面的例子中,你必须等到fut1完成才能实现fut2失败 . 要解决这个问题,你可以尝试这样的事情:

    val fut1 = Future{Thread.sleep(3000);1}
      val fut2 = Promise.failed(new RuntimeException("boo")).future
      val fut3 = Future{Thread.sleep(1000);3}
    
      def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
        val fut = if (futures.size == 1) futures.head._2
        else Future.firstCompletedOf(futures.values)
    
        fut onComplete{
          case Success(value) if (futures.size == 1)=> 
            prom.success(value :: values)
    
          case Success(value) =>
            processFutures(futures - value, value :: values, prom)
    
          case Failure(ex) => prom.failure(ex)
        }
        prom.future
      }
    
      val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
      aggFut onComplete{
        case value => println(value)
      }
    

    现在这可以正常工作,但问题来自于知道哪一个 Future 在成功完成后从 Map 中删除 . 只要你有一些方法可以将结果与产生该结果的Future进行正确关联,那么这样的方法就可以了 . 它只是递归地从 Map 中删除已完成的Futures,然后在剩余的 Futures 上调用 Future.firstCompletedOf ,直到没有剩下的为止,收集结果 . 它不漂亮,但如果你真的需要你正在谈论的行为,那么这个,或类似的东西可以工作 .

  • 75

    您可以使用承诺,并向其发送第一次失败或最终完成的聚合成功:

    def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
      val p = Promise[M[A]]()
    
      // the first Future to fail completes the promise
      in.foreach(_.onFailure{case i => p.tryFailure(i)})
    
      // if the whole sequence succeeds (i.e. no failures)
      // then the promise is completed with the aggregated success
      Future.sequence(in).foreach(p trySuccess _)
    
      p.future
    }
    

    然后你可以 Await 在那个结果 Future 如果你想阻止,或者只是 map 它变成别的东西 .

    与理解的区别在于,在这里你得到第一个失败的错误,而对于理解,你得到输入集合的遍历顺序中的第一个错误(即使另一个先失败) . 例如:

    val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
    val f2 = Future { 5 }
    val f3 = Future { None.get }
    
    Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
    // this waits one second, then prints "java.lang.ArithmeticException: / by zero"
    // the first to fail in traversal order
    

    和:

    val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
    val f2 = Future { 5 }
    val f3 = Future { None.get }
    
    sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
    // this immediately prints "java.util.NoSuchElementException: None.get"
    // the 'actual' first to fail (usually...)
    // and it returns early (it does not wait 1 sec)
    
  • 7

    这是一个不使用actor的解决方案 .

    import scala.util._
    import scala.concurrent._
    import java.util.concurrent.atomic.AtomicInteger
    
    // Nondeterministic.
    // If any failure, return it immediately, else return the final success.
    def allSucceed[T](fs: Future[T]*): Future[T] = {
      val remaining = new AtomicInteger(fs.length)
    
      val p = promise[T]
    
      fs foreach {
        _ onComplete {
          case s @ Success(_) => {
            if (remaining.decrementAndGet() == 0) {
              // Arbitrarily return the final success
              p tryComplete s
            }
          }
          case f @ Failure(_) => {
            p tryComplete f
          }
        }
      }
    
      p.future
    }
    
  • 5

    你可以单独使用期货来做到这一点 . 这是一个实现 . 请注意,它不会提前终止执行!在这种情况下,您需要做一些更复杂的事情(并且可能自己实施中断) . 但是如果你只是不想继续等待不起作用的东西,那么关键是要等待第一件事情完成,并在没有任何东西或者遇到异常时停止:

    import scala.annotation.tailrec
    import scala.util.{Try, Success, Failure}
    import scala.concurrent._
    import scala.concurrent.duration.Duration
    import ExecutionContext.Implicits.global
    
    @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): 
    Either[Throwable, Seq[A]] = {
      val first = Future.firstCompletedOf(fs)
      Await.ready(first, Duration.Inf).value match {
        case None => awaitSuccess(fs, done)  // Shouldn't happen!
        case Some(Failure(e)) => Left(e)
        case Some(Success(_)) =>
          val (complete, running) = fs.partition(_.isCompleted)
          val answers = complete.flatMap(_.value)
          answers.find(_.isFailure) match {
            case Some(Failure(e)) => Left(e)
            case _ =>
              if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
              else Right( answers.map(_.get) ++: done )
          }
      }
    }
    

    以下是一切正常的例子:

    scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
      Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
      Future{ Thread.sleep(2000); println("Bye!") }
    ))
    Hi!
    Fancy meeting you here!
    Bye!
    res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
    

    但是当出现问题时:

    scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
      Future{ Thread.sleep(1000); throw new Exception("boo"); () }, 
      Future{ Thread.sleep(2000); println("Bye!") }
    ))
    Hi!
    res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)
    
    scala> Bye!
    
  • 2

    为此我会使用Akka演员 . 与for-comprehension不同,它会在任何期货失败时立即失败,因此在这个意义上它会更有效率 .

    class ResultCombiner(futs: Future[_]*) extends Actor {
    
      var origSender: ActorRef = null
      var futsRemaining: Set[Future[_]] = futs.toSet
    
      override def receive = {
        case () =>
          origSender = sender
          for(f <- futs)
            f.onComplete(result => self ! if(result.isSuccess) f else false)
        case false =>
          origSender ! SomethingFailed
        case f: Future[_] =>
          futsRemaining -= f
          if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
      }
    
    }
    
    sealed trait Result
    case object SomethingFailed extends Result
    case object EverythingSucceeded extends Result
    

    然后,创建actor,向其发送消息(以便它知道将其回复发送到何处)并等待回复 .

    val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
    try {
      val f4: Future[Result] = actor ? ()
      implicit val timeout = new Timeout(30 seconds) // or whatever
      Await.result(f4, timeout.duration).asInstanceOf[Result] match {
        case SomethingFailed => println("Oh noes!")
        case EverythingSucceeded => println("It all worked!")
      }
    } finally {
      // Avoid memory leaks: destroy the actor
      actor ! PoisonPill
    }
    
  • 5

    这个问题已得到解答,但我发布了我的 Value 类解决方案( Value 类在2.10中添加),因为这里没有一个 . 请随意批评 .

    implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
        def concurrently = ConcurrentFuture(self)
      }
      case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
        def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
        def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
      }
      def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
        val p = Promise[B]()
        val inner = f(outer.future)
        inner.future onFailure { case t => p.tryFailure(t) }
        outer.future onFailure { case t => p.tryFailure(t) }
        inner.future onSuccess { case b => p.trySuccess(b) }
        ConcurrentFuture(p.future)
      }
    

    ConcurrentFuture是一个无开销的Future包装器,它将默认的Future map / flatMap从do-this-then-that更改为all-all-and-fail-if-any-fail . 用法:

    def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
    def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
    def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }
    
    val f : Future[(Int,String,Double)] = {
      for {
        f1 <- func1.concurrently
        f2 <- func2.concurrently
        f3 <- func3.concurrently
      } yield for {
       v1 <- f1
       v2 <- f2
       v3 <- f3
      } yield (v1,v2,v3)
    }.future
    f.onFailure { case t => println("future failed $t") }
    

    在上面的示例中,f1,f2和f3将同时运行,如果任何顺序失败,元组的未来将立即失败 .

  • 32

    您可能想要查看Twitter的Future API . 特别是Future.collect方法 . 它完全符合您的要求:https://twitter.github.io/scala_school/finagle.html

    源代码Future.scala可在此处获取:https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

  • 4

    你可以用这个:

    val l = List(1, 6, 8)
    
    val f = l.map{
      i => future {
        println("future " +i)
        Thread.sleep(i* 1000)
        if (i == 12)
          throw new Exception("6 is not legal.")
        i
      }
    }
    
    val f1 = Future.sequence(f)
    
    f1 onSuccess{
      case l => {
        logInfo("onSuccess")
        l.foreach(i => {
    
          logInfo("h : " + i)
    
        })
      }
    }
    
    f1 onFailure{
      case l => {
        logInfo("onFailure")
      }
    

相关问题