首页 文章

如何在Scala中实现Future作为应用?

提问于
浏览
6

假设我需要运行两个并发计算,等待它们,然后组合它们的结果 . 更具体地说,我需要同时运行 f1: X1 => Y1f2: X2 => Y2 ,然后调用 f: (Y1, Y2) => Y 以最终获得 Y 的值 .

我可以创建未来的计算 fut1: X1 => Future[Y1]fut2: X2 => Future[Y2] ,然后使用monadic组合将它们组合成 fut: (X1, X2) => Future[Y] .

问题是monadic组合意味着顺序等待 . 在我们的例子中,它意味着我们先等待一个未来,然后我们将等待另一个未来 . 例如 . 如果它需要2秒 . 到第一个未来完成,只需1秒 . 到第二个未来失败,我们浪费1秒 .

因此,看起来我们需要期货的应用组合等待,直到完成或至少一个未来失败 . 是否有意义 ?你如何实现期货的 <*>

4 回答

  • 2

    其他答案中的所有方法都没有在未来快速失败的情况下做正确的事情,以及在很长一段时间后成功的未来 .

    但是这种方法可以手动实现:

    def smartSequence[A](futures: Seq[Future[A]]): Future[Seq[A]] = {
      val counter = new AtomicInteger(futures.size)
      val result = Promise[Seq[A]]()
    
      def attemptComplete(t: Try[A]): Unit = {
        val remaining = counter.decrementAndGet
        t match {
          // If one future fails, fail the result immediately
          case Failure(cause) => result tryFailure cause
          // If all futures have succeeded, complete successful result
          case Success(_) if remaining == 0 => 
            result tryCompleteWith Future.sequence(futures)
          case _ =>
        }
      }
    
      futures.foreach(_ onComplete attemptComplete)
      result.future
    }
    

    ScalaZ在内部做了类似的事情,因此在任何期货失败后, f1 |@| f2 和_2639419都会立即失败 .

    以下是对这些方法失败时间的快速测试:

    import java.util.Date
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scalaz._, Scalaz._
    
    object ReflectionTest extends App {
      def f1: Future[Unit] = Future {
        Thread.sleep(2000)
      }
    
      def f2: Future[Unit] = Future {
        Thread.sleep(1000)
        throw new RuntimeException("Failure")
      }
    
      def test(name: String)(
        f: (Future[Unit], Future[Unit]) => Future[Unit]
      ): Unit = {
        val start = new Date().getTime
        f(f1, f2).andThen {
          case _ => 
            println(s"Test $name completed in ${new Date().getTime - start}")
        }
        Thread.sleep(2200)
      }
    
      test("monadic") { (f1, f2) => for (v1 <- f1; v2 <- f2) yield () }
    
      test("zip") { (f1, f2) => (f1 zip f2).map(_ => ()) }
    
      test("Future.sequence") { 
        (f1, f2) => Future.sequence(Seq(f1, f2)).map(_ => ()) 
      }
    
      test("smartSequence") { (f1, f2) => smartSequence(Seq(f1, f2)).map(_ => ())}
    
      test("scalaz |@|") { (f1, f2) => (f1 |@| f2) { case _ => ()}}
    
      test("scalaz sequence") { (f1, f2) => List(f1, f2).sequence.map(_ => ())}
    
      Thread.sleep(30000)
    }
    

    我的机器上的结果是:

    Test monadic completed in 2281
    Test zip completed in 2008
    Test Future.sequence completed in 2007
    Test smartSequence completed in 1005
    Test scalaz |@| completed in 1003
    Test scalaz sequence completed in 1005
    
  • 5

    问题是monadic组合意味着顺序等待 . 在我们的例子中,它意味着我们先等待一个未来,然后我们将等待另一个未来 .

    不幸的是,这是真的 .

    import java.util.Date
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    
    object Test extends App {
            def timestamp(label: String): Unit = Console.println(label + ": " + new Date().getTime.toString)
    
            timestamp("Start")
            for {
                    step1 <- Future {
                            Thread.sleep(2000)
                            timestamp("step1")
                    }
                    step2 <- Future {
                            Thread.sleep(1000)
                            timestamp("step2")
                    }
            } yield { timestamp("Done") }
    
            Thread.sleep(4000)
    }
    

    运行此代码输出:

    Start: 1430473518753
    step1: 1430473520778
    step2: 1430473521780
    Done: 1430473521781
    

    因此看起来我们需要期货的应用组合等到完成或至少一个未来都失败 .

    我不确定应用程序组成与并发策略有什么关系 . 使用 for comprehensions,如果所有期货完成或者如果其中任何一个失败,则会得到结果 . 所以它在语义上是一样的 .

    为什么它们按顺序运行

    我认为期货顺序运行的原因是因为 step1step2 (以及其余计算中)可用 . 基本上我们可以将 for 块转换为:

    def step1() = Future {
        Thread.sleep(2000)
        timestamp("step1")
    }
    def step2() = Future {
        Thread.sleep(1000)
        timestamp("step2")
    }
    def finalStep() = timestamp("Done")
    step1().flatMap(step1 => step2()).map(finalStep())
    

    因此,先前计算的结果可用于其余步骤 . 它在这方面与 <?><*> 不同 .

    如何并行运行期货

    @ andrey-tyukin的代码并行运行期货:

    import java.util.Date
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    
    object Test extends App {
            def timestamp(label: String): Unit = Console.println(label + ": " + new Date().getTime.toString)
    
            timestamp("Start")
            (Future {
                    Thread.sleep(2000)
                    timestamp("step1")
            } zip Future {
                    Thread.sleep(1000)
                    timestamp("step2")
            }).map(_ => timestamp("Done"))
            Thread.sleep(4000)
    }
    

    输出:

    Start: 1430474667418
    step2: 1430474668444
    step1: 1430474669444
    Done: 1430474669446
    
  • 2

    您的帖子似乎包含两个或多或少独立的问题 . 我将首先解决运行两个并发计算的具体实际问题 . 最后回答了关于 Applicative 的问题 .

    假设您有两个异步函数:

    val f1: X1 => Future[Y1]
    val f2: X2 => Future[Y2]
    

    还有两个值:

    val x1: X1
    val x2: X2
    

    现在,您可以通过多种不同方式开始计算 . 我们来看看其中的一些 .

    Starting computations outside of for (parallel)

    假设你这样做:

    val y1: Future[Y1] = f1(x1)
    val y2: Future[Y2] = f2(x2)
    

    现在,计算 f1f2 已经在运行 . 您收集结果的顺序无关紧要 . 你可以用 for -理解来做到这一点:

    val y: Future[(Y1,Y2)] = for(res1 <- y1; res2 <- y2) yield (res1,res2)
    

    for -comprehension中使用表达式 y1y2 不会干扰 y1y2 的计算顺序,它们仍然是并行计算的 .

    Starting computations inside of for (sequential)

    如果我们只是简单地采用 y1y2 的定义,并将它们直接插入到 for 理解中,我们仍会得到相同的结果,但执行顺序会有所不同:

    val y = for (res1 <- f1(x1); res2 <- f2(x2)) yield (res1, res2)
    

    翻译成

    val y = f1(x1).flatMap{ res1 => f2(x2).map{ res2 => (res1, res2) } }
    

    特别是,第二次计算在第一次计算终止后开始 . 这通常不是人们想拥有的 .

    这里违反了基本替代原则 . 如果没有副作用,可能会将此版本转换为前一版本,但在Scala中,必须明确地处理执行顺序 .

    Zipping futures (parallel)

    期货尊重产品 . 有一个方法 Future.zip ,它允许您这样做:

    val y = f1(x1) zip f2(x2)
    

    这将并行运行两个计算,直到完成两个计算,或者直到其中一个计算失败 .

    Demo

    这是一个演示此行为的小脚本(受 muhuk 帖子的启发):

    import scala.concurrent._
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global
    import java.lang.Thread.sleep
    import java.lang.System.{currentTimeMillis => millis}
    
    var time: Long = 0
    
    val x1 = 1
    val x2 = 2
    
    // this function just waits
    val f1: Int => Future[Unit] = { 
      x => Future { sleep(x * 1000) }
    }
    
    // this function waits and then prints
    // elapsed time
    val f2: Int => Future[Unit] = {
      x => Future { 
        sleep(x * 1000)
        val elapsed = millis() - time
        printf("Time: %1.3f seconds\n", elapsed / 1000.0)
      }
    }
    
    /* Outside `for` */ {
      time = millis()
      val y1 = f1(x1)
      val y2 = f2(x2)
      val y = for(res1 <- y1; res2 <- y2) yield (res1,res2)
      Await.result(y, Duration.Inf)
    }
    
    /* Inside `for` */ {
      time = millis()
      val y = for(res1 <- f1(x1); res2 <- f2(x2)) yield (res1, res2)
      Await.result(y, Duration.Inf)
    }
    
    /* Zip */ {
      time = millis()
      val y = f1(x1) zip f2(x2)
      Await.result(y, Duration.Inf)
    }
    

    输出:

    Time: 2.028 seconds
    Time: 3.001 seconds
    Time: 2.001 seconds
    

    Applicative

    使用your other post中的此定义:

    trait Applicative[F[_]] {
      def apply[A, B](f: F[A => B]): F[A] => F[B]
    }
    

    一个人可以这样做:

    object FutureApplicative extends Applicative[Future] {
      def apply[A, B](ff: Future[A => B]): Future[A] => Future[B] = {
        fa => for ((f,a) <- ff zip fa) yield f(a)
      }
    }
    

    但是,我不确定这与你的具体问题有关,或者与之有关可理解和可读的代码 . Future 已经是一个monad(这比 Applicative 强),甚至还有内置的语法,所以我认为在这里添加一些 Applicative 没有任何优势 .

  • 1

    它不需要是顺序的 . 未来的计算可能会在创建未来的那一刻开始 . 当然,如果未来是由flatMap参数创建的(如果它需要第一次计算的结果则必须如此),那么它将是顺序的 . 但在诸如此类的代码中

    val f1 = Future {....}
    val f2 = Future {....}
    for (a1 <- f1; a2 <- f2) yield f(a1, a2)
    

    你得到并发执行 .

    所以Monad所暗示的Applicative的实现是可以的 .

相关问题