首页 文章

Scala未来序列和超时处理

提问于
浏览
6

如何结合期货with timeouts有一些很好的提示 . 不过我很好奇如何用 Future sequence sequenceOfFutures 做到这一点

我的第一种方法看起来像这样

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits._

object FutureSequenceScala extends App {
  println("Creating futureList")

  val timeout = 2 seconds
  val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
    val f = future {
      Thread sleep ms
      ms toString
    }
    Future firstCompletedOf Seq(f, fallback(timeout))
  }

  println("Creating waitinglist")
  val waitingList = Future sequence futures
  println("Created")

  val results = Await result (waitingList, timeout * futures.size)
  println(results)

  def fallback(timeout: Duration) = future {
    Thread sleep (timeout toMillis)
    "-1"
  }
}

有没有更好的方法来处理一系列期货中的超时或这是一个有效的解决方案?

3 回答

  • 8

    您的代码中有一些内容可能需要重新考虑 . 对于初学者来说,我并不是将任务提交到_908676中的忠实粉丝,其唯一目的是模拟超时并且还使用 Thread.sleep . sleep 调用是阻塞的,您可能希望避免在执行上下文中有一个任务,为了等待一段固定的时间而完全阻塞 . 我将从我的回答中窃取here并建议对于纯超时处理,你应该使用我在答案中概述的内容 . HashedWheelTimer 是一个高效的计时器实现,比只是睡眠的任务更适合超时处理 .

    现在,如果你走这条路线,下一次改变我会建议处理每个未来的个别超时相关故障 . 如果您希望单个故障完全失败,从 sequence 调用返回的聚合 Future ,则不执行任何额外操作 . 如果您不希望发生这种情况,而是希望超时返回一些默认值,那么您可以在 Future 上使用 recover ,如下所示:

    withTimeout(someFuture).recover{
      case ex:TimeoutException => someDefaultValue
    }
    

    完成后,您可以利用非阻塞回调并执行以下操作:

    waitingList onComplete{
      case Success(results) => //handle success
      case Failure(ex) => //handle fail
    }
    

    每个未来都有一个超时,因此不会无限运行 . 没有必要IMO阻止那里并通过 atMost param提供额外的超时处理层到 Await.result . 但我想这可以假设您对非阻塞方法没问题 . 如果你真的需要阻止,那么你不应该等待 timeout * futures.size 的时间 . 这些期货并行运行;那里的超时应该只需要与期货本身的个别超时一样长(或者只是稍微长一点来解释cpu /时间的任何延迟) . 它当然不应该是超时*期货的总数 .

  • 1

    这是一个显示阻止 fallback 有多糟糕的版本 .

    请注意,执行程序是单线程的,您创建了许多后备 .

    @cmbaxter是对的,你的主超时不应该 timeout * futures.size ,它应该更大!

    @cmbaxter也是对的,你想要非阻塞 . 一旦你这样做,并且你想要施加超时,那么你将选择一个计时器组件,查看他的链接答案(也链接到你的链接答案) .

    也就是说,我仍然喜欢my answer from your link,因为坐在循环中等待下一个应该超时的事情真的很简单 .

    它只需要一份期货清单及其超时和后备 Value .

    也许有一个用例,例如一个简单的应用程序只是阻止一些结果(如你的测试),并且必须在结果出来之前退出 .

    import scala.concurrent._
    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext
    
    import java.util.concurrent.Executors
    import java.lang.System.{ nanoTime => now }
    
    object Test extends App { 
      //implicit val xc = ExecutionContext.global
      implicit val xc = ExecutionContext fromExecutorService (Executors.newSingleThreadExecutor)
    
      def timed[A](body: =>A): A = {
        val start = now 
        val res = body
        val end = now
        Console println (Duration fromNanos end-start).toMillis + " " + res
        res
      }
      println("Creating futureList")
    
      val timeout = 1500 millis
      val futures = List(1000, 1500, 1200, 800, 2000) map { ms =>
        val f = future {
          timed {
            blocking(Thread sleep ms)
            ms toString
          }
        } 
        Future firstCompletedOf Seq(f, fallback(timeout))
      }   
    
      println("Creating waitinglist")
      val waitingList = Future sequence futures
      println("Created")
    
      timed {
      val results = Await result (waitingList, 2 * timeout * futures.size)
      println(results)
      }     
      xc.shutdown
    
      def fallback(timeout: Duration) = future {
        timed {
          blocking(Thread sleep (timeout toMillis))
          "-1"
        }
      }   
    }
    

    发生了什么:

    Creating futureList
    Creating waitinglist
    Created
    1001 1000
    1500 -1
    1500 1500
    1500 -1
    1200 1200
    1500 -1
    800 800
    1500 -1
    2000 2000
    1500 -1
    List(1000, 1500, 1200, 800, 2000)
    14007 ()
    
  • 1

    Monix Task有timeout支持:

    import monix.execution.Scheduler.Implicits.global
      import monix.eval._
      import scala.concurrent.duration._
    
      println("Creating futureList")
      val tasks = List(1000, 1500, 1200, 800, 2000).map{ ms =>
        Task {
          Thread.sleep(ms)
          ms.toString
        }.timeoutTo(2.seconds, Task.now("-1"))
      }
    
      println("Creating waitinglist")
      val waitingList = Task.gather(tasks) // Task.sequence is true/literally "sequencing" operation
    
      println("Created")
      val results = Await.result(waitingList, timeout * futures.size)
      println(results)
    

相关问题