首页 文章

Scala:等待一系列期货的超时,然后收集完成的结果

提问于
浏览
0

情况:

有许多阻塞同步调用(这是一个无法更改的给定),这可能需要很长时间才能聚合结果 .

目标:

使呼叫无阻塞,然后等待最长时间(ms)并收集所有已成功的呼叫,即使有些可能因为超时而失败(因此我们可以降低失败呼叫的功能) .

当前解决方案:

下面的解决方案通过组合期货来工作,等待那个期货完成或超时,并且在NonFatal错误(超时)的情况下,它使用 completedFutureValues 方法提取成功完成的期货 .

import scala.concurrent.{Await, Future}
  import scala.util.Random._
  import scala.concurrent.duration._
  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.util.{Failure, Success}
  import scala.util.control.NonFatal

  def potentialLongBlockingHelloWorld(i: Int): String = {Thread.sleep(nextInt(500)); s"hello world $i" }

  // use the same method 3 times, but in reality is different methods (with different types)
  val futureHelloWorld1 = Future(potentialLongBlockingHelloWorld(1))
  val futureHelloWorld2 = Future(potentialLongBlockingHelloWorld(2))
  val futureHelloWorld3 = Future(potentialLongBlockingHelloWorld(3))

  val combinedFuture: Future[(String, String, String)] = for {
    hw1 <- futureHelloWorld1
    hw2 <- futureHelloWorld2
    hw3 <- futureHelloWorld3
  } yield (hw1, hw2, hw3)

  val res = try {
    Await.result(combinedFuture, 250.milliseconds)
  } catch {
    case NonFatal(_) => {
      (
        completedFutureValue(futureHelloWorld1, "fallback hello world 1"),
        completedFutureValue(futureHelloWorld2, "fallback hello world 2"),
        completedFutureValue(futureHelloWorld3, "fallback hello world 3")
      )
    }
  }

  def completedFutureValue[T](future: Future[T], fallback: T): T =
    future.value match {
      case Some(Success(value)) => value
      case Some(Failure(e)) =>
        fallback
      case None =>
        fallback
    }

它将返回tuple3,其中包含已完成的未来结果或后备,例如: (hello world,fallback hello world 2,fallback hello world 3)

虽然这有效,但我对此并不特别满意 .

问题:

我们怎样才能改进这一点?

5 回答

  • 1

    如果我也可以建议一种方法 . 想法是避免一起阻塞,并在每个未来实际设置超时 . 这是一篇博文,我发现在做我的例子时非常有用,它有点旧,但是黄金的东西:

    https://nami.me/2015/01/20/scala-futures-with-timeout/

    这里的一个负面观点是你可能需要在解决方案中添加akka,但是再次它并不完全丑陋:

    import akka.actor.ActorSystem
      import akka.pattern.after
    
      import scala.concurrent.ExecutionContext.Implicits.global
      import scala.concurrent.duration.{FiniteDuration, _}
      import scala.concurrent.{Await, Future}
      import scala.util.Random._
    
      implicit val system = ActorSystem("theSystem")
    
      implicit class FutureExtensions[T](f: Future[T]) {
        def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = {
          Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout)))
        }
      }
    
      def potentialLongBlockingHelloWorld(i: Int): String = {
        Thread.sleep(nextInt(500)); s"hello world $i"
      }
    
      implicit val timeout: FiniteDuration = 250.milliseconds
    
      val timeoutException = new TimeoutException("Future timed out!")
    
      // use the same method 3 times, but in reality is different methods (with different types)
      val futureHelloWorld1 = Future(potentialLongBlockingHelloWorld(1)).withTimeout(timeoutException).recoverWith { case _ ⇒ Future.successful("fallback hello world 1") }
      val futureHelloWorld2 = Future(potentialLongBlockingHelloWorld(2)).withTimeout(timeoutException).recoverWith { case _ ⇒ Future.successful("fallback hello world 2") }
      val futureHelloWorld3 = Future(potentialLongBlockingHelloWorld(3)).withTimeout(timeoutException).recoverWith { case _ ⇒ Future.successful("fallback hello world 3") }
    
      val results = Seq(futureHelloWorld1, futureHelloWorld2, futureHelloWorld3)
    
      val combinedFuture = Future.sequence(results)
    
      // this is just to show what you would have in your future
      // combinedFuture is not blocking anything
      val justToShow = Await.result(combinedFuture, 1.seconds)
      println(justToShow)
      // some of my runs:
      // List(hello world 1, hello world 2, fallback hello world 3)
      // List(fallback hello world 1, fallback hello world 2, hello world 3)
    

    使用这种方法没有阻塞,你在每个阶段都有一个超时,这样你就可以微调并适应你真正需要的东西 . 我正在使用的只是为了展示它是如何工作的 .

  • 0

    为什么不写:

    val futures: f1 :: f2 :: f3 :: Nil
    val results = futures map { f =>
        Await.result(f, yourTimeOut)
    }
    results.collect {
        case Success => /* your logic */
    }
    

    ???

  • 0

    在这里发布由同事提供的解决方案,该解决方案基本上与问题中提供的解决方案相同,但使其更加清洁 .

    使用他的解决方案可以写:

    (
      Recoverable(futureHelloWorld1, "fallback hello world 1"),
      Recoverable(futureHelloWorld2, "fallback hello world 1"),
      Recoverable(futureHelloWorld3, "fallback hello world 1")
    ).fallbackAfter(250.milliseconds) {
      case (hw1, hw2, hw3) =>
        // Do something with the results.
        println(hw1.value)
        println(hw2.value)
        println(hw3.value)
    }
    

    这可以使用期货元组和后备 . 使这成为可能的代码:

    import org.slf4j.LoggerFactory
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
    import scala.util.Try
    import scala.util.control.NonFatal
    
    sealed abstract class FallbackFuture[T] private(private val future: Future[T]) {
      def value: T
    }
    
    object FallbackFuture {
      final case class Recoverable[T](future: Future[T], fallback: T) extends FallbackFuture[T](future) {
        override def value: T = {
          if (future.isCompleted) future.value.flatMap(t => t.toOption).getOrElse(fallback)
          else fallback
        }
      }
    
      object Recoverable {
        def apply[T](fun: => T, fallback: T)(implicit ec: ExecutionContext): FallbackFuture[T] = {
          new Recoverable[T](Future(fun), fallback)
        }
      }
    
      final case class Irrecoverable[T](future: Future[T]) extends FallbackFuture[T](future) {
        override def value: T = {
          def except = throw new IllegalAccessException("Required future did not compelete before timeout")
          if (future.isCompleted) future.value.flatMap(_.toOption).getOrElse(except)
          else except
        }
      }
    
      object Irrecoverable {
        def apply[T](fun: => T)(implicit ec: ExecutionContext): FallbackFuture[T] = {
          new Irrecoverable[T](Future(fun))
        }
      }
    
      object Implicits {
        private val logger = LoggerFactory.getLogger(Implicits.getClass)
    
        type FF[X] = FallbackFuture[X]
    
        implicit class Tuple2Ops[V1, V2](t: (FF[V1], FF[V2])) {
          def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2])) => R): R =
            awaitAll(timeout, t) {
              fn(t)
            }
        }
    
        implicit class Tuple3Ops[V1, V2, V3](t: (FF[V1], FF[V2], FF[V3])) {
          def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3])) => R): R =
            awaitAll(timeout, t) {
              fn(t)
            }
        }
    
        implicit class Tuple4Ops[V1, V2, V3, V4](t: (FF[V1], FF[V2], FF[V3], FF[V4])) {
          def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4])) => R): R =
            awaitAll(timeout, t) {
              fn(t)
            }
        }
    
        implicit class Tuple5Ops[V1, V2, V3, V4, V5](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5])) {
          def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5])) => R): R =
            awaitAll(timeout, t) {
              fn(t)
            }
        }
    
        implicit class Tuple6Ops[V1, V2, V3, V4, V5, V6](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6])) {
          def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6])) => R): R =
            awaitAll(timeout, t) {
              fn(t)
            }
        }
    
        implicit class Tuple7Ops[V1, V2, V3, V4, V5, V6, V7](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7])) {
          def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7])) => R): R =
            awaitAll(timeout, t) {
              fn(t)
            }
        }
    
        implicit class Tuple8Ops[V1, V2, V3, V4, V5, V6, V7, V8](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8])) {
          def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8])) => R): R =
            awaitAll(timeout, t) {
              fn(t)
            }
        }
    
        implicit class Tuple9Ops[V1, V2, V3, V4, V5, V6, V7, V8, V9](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9])) {
          def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9])) => R): R =
            awaitAll(timeout, t) {
              fn(t)
            }
        }
    
        implicit class Tuple10Ops[V1, V2, V3, V4, V5, V6, V7, V8, V9, V10](t: (FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9], FF[V10])) {
          def fallbackAfter[R](timeout: Duration)(fn: ((FF[V1], FF[V2], FF[V3], FF[V4], FF[V5], FF[V6], FF[V7], FF[V8], FF[V9], FF[V10])) => R): R =
            awaitAll(timeout, t) {
              fn(t)
            }
        }
    
        private implicit def toFutures(fallbackFuturesTuple: Product): Seq[Future[Any]] = {
          fallbackFuturesTuple.productIterator.toList
            .map(_.asInstanceOf[FallbackFuture[Any]])
            .map(_.future)
        }
    
        private def awaitAll[R](timeout: Duration, futureSeq: Seq[Future[Any]])(fn: => R) = {
          Try {
            Await.ready(Future.sequence(futureSeq), timeout)
          } recover {
            case _: TimeoutException => logger.warn("Call timed out")
            case NonFatal(ex) => throw ex
          }
          fn
        }
      }
    }
    
  • 0

    可能最好使用Future.sequence()从Collection [Future]返回Future [Collection]

  • 1

    一旦(据我所知)你将阻止当前线程并同步等待结果,我会说最简单的解决方案应该是:

    import java.util.concurrent.atomic.AtomicReference
    
    import scala.concurrent.{Await, Future}
    import scala.util.Random._
    import scala.concurrent.ExecutionContext.Implicits.global
    
    def potentialLongBlockingHelloWorld(i: Int): String = {Thread.sleep(nextInt(500)); s"hello world $i" }
    
    
    // init with fallback
    val result1 = new AtomicReference[String]("fallback hello world 1")
    val result2 = new AtomicReference[String]("fallback hello world 2")
    val result3 = new AtomicReference[String]("fallback hello world 3")
    
    // use the same method 3 times, but in reality is different methods (with different types)
    val f1 = Future(potentialLongBlockingHelloWorld(1)).map {res =>
      result1.set(res)
    }
    val f2 = Future(potentialLongBlockingHelloWorld(2)).map {res =>
      result2.set(res)
    }
    val f3 = Future(potentialLongBlockingHelloWorld(3)).map {res =>
      result1.set(res)
    }
    
    for (i <- 1 to 5 if !(f1.isCompleted && f2.isCompleted && f3.isCompleted)) {
      Thread.sleep(50)
    }
    
    (result1.get(), result2.get(), result3.get())
    

    在这里,您只需在AtomicReferences中引入结果,这些结果将在未来完成时更新,并检查所有期货已完成或最多250毫秒(超时)的刻度结果 .

    或者,您可以使用回退和超时从here extend获得 Future with timeout 实现,而不仅仅使用 Future.sequence 和Await,并保证所有 Futures 将在成功或回退时及时完成 .

相关问题