首页 文章

斯卡拉等待期货序列

提问于
浏览
22

我希望像下面这样的代码会等待两个期货,但事实并非如此 .

object Fiddle {
  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2) // in the general case, this would be a dynamically sized list

  val seq = Future.sequence(lf) 

  seq.onComplete {
    _ => lf.foreach(f => println(f.isCompleted))
  }
}

val a = FuturesSequence

我假设 seq.onComplete 会在完成之前等待他们全部完成,但不是这样;它导致:

true
false

.sequence 在scala.concurrent.Future的源代码中有点难以理解,我想知道如何实现一个等待(动态大小)序列的所有原始未来的并行,或者这里可能存在的问题 .

编辑:一个相关的问题:https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)

4 回答

  • 1

    等待所有结果(失败与否)的一种常见方法是将未来的新表示失败,以便所有期货都能得到一些结果(尽管它们可能会以表示失败的结果完成) . 一个自然的方法是提升到 Try .

    Twitter's implementation of futures提供了一个简单的 liftToTry 方法,但您可以使用标准库的实现执行类似的操作:

    import scala.util.{ Failure, Success, Try }
    
    val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
      _.map(Success(_)).recover { case t => Failure(t) }
    )
    

    现在 Future.sequence(lifted) 将在每个未来完成时完成,并将使用 Try 表示成功和失败 .

    因此,等待一系列期货的所有原始期货的通用解决方案可能如下所示,假设执行上下文当然是隐式可用的 .

    import scala.util.{ Failure, Success, Try }
    
      private def lift[T](futures: Seq[Future[T]]) = 
        futures.map(_.map { Success(_) }.recover { case t => Failure(t) })
    
      def waitAll[T](futures: Seq[Future[T]]) =
        Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used
    
      waitAll(SeqOfFutures).map { 
        // do whatever with the completed futures
      }
    
  • 29

    Future.sequence 生成的 Future 完成时:

    • 所有期货都已成功完成,或

    • 其中一个期货失败了

    第二点是你的情况正在发生的事情,只要其中一个被包装的 Future 失败就完成了,因为包装 Future 在故障情况下只能容纳一个 Throwable . 等待其他期货没有意义,因为结果将是同样的失败 .

  • 17

    这是支持上一个答案的示例 . 使用标准的Scala API可以轻松实现此目的 .

    在这个例子中,我正在创建3个期货 . 这些将分别在5秒,7秒和9秒完成 . 对 Await.result 的调用将会阻止,直到所有期货都已解决 . 一旦所有3个期货完成, a 将被设置为 List(5,7,9) 并继续执行 .

    此外,如果在任何期货中抛出异常, Await.result 将立即解除阻止并抛出异常 . 取消注释 Exception(...) 行以查看此操作 .

    try {
        val a = Await.result(Future.sequence(Seq(
          Future({
            blocking {
              Thread.sleep(5000)
            }
            System.err.println("A")
            5
          }),
          Future({
            blocking {
              Thread.sleep(7000)
            }
            System.err.println("B")
            7
            //throw new Exception("Ha!")
          }),
          Future({
            blocking {
              Thread.sleep(9000)
            }
            System.err.println("C")
            9
          }))),
          Duration("100 sec"))
    
        System.err.println(a)
      } catch {
        case e: Exception ⇒
          e.printStackTrace()
      }
    
  • 2

    我们可以通过隐式类使用自己的 onComplete 方法来丰富 Seq[Future[T]]

    def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
        f map { Success(_) } recover { case e => Failure(e) }
    
      def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
        fs map { lift(_) }
    
      implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
        def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
          Future.sequence(lift(fs)) onComplete {
            case Success(s) => f(s)
            case Failure(e) => throw e // will never happen, because of the Try lifting
          }
        }
      }
    

    然后,在您的特定MWE中,您可以执行以下操作:

    val f1 = Future {
        throw new Throwable("baaa") // emulating a future that bumped into an exception
      }
    
      val f2 = Future {
        Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
        2
      }
    
      val lf = List(f1, f2)
    
      lf onComplete { _ map {
        case Success(v) => ???
        case Failure(e) => ???
      }}
    

    这种解决方案的优势在于,您可以像在单个未来那样在期货序列上调用 onComplete .

相关问题