首页 文章

Scala Futures - 内置超时?

提问于
浏览
48

从官方教程参考中我还没有完全理解未来的一个方面 . http://docs.scala-lang.org/overviews/core/futures.html

scala中的期货是否具有某种内置的超时机制?假设下面的例子是一个5千兆字节的文本文件......“Implicits.global”的隐含范围是否会导致onFailure以非阻塞方式触发或者可以定义?没有某种默认的超时时间,这是否意味着它既不会成功也不会失败?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}

10 回答

  • 3

    当您使用阻止来获取 Future 的结果时,您只会获得超时行为 . 如果要使用非阻塞回调 onCompleteonSuccessonFailure ,则必须滚动自己的超时处理 . Akka内置了针对演员之间的请求/响应( ? )消息传递的超时处理,但不确定是否要开始使用Akka . FWIW,在Akka中,对于超时处理,它们通过 Future.firstCompletedOf 一起组成两个 Futures ,一个代表实际的异步任务,另一个代表超时 . 如果超时计时器(通过 HashedWheelTimer )首先弹出,则异步回调会出现故障 .

    滚动自己的一个非常简单的例子可能是这样的 . 首先,一个用于调度超时的对象:

    import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
    import java.util.concurrent.TimeUnit
    import scala.concurrent.duration.Duration
    import scala.concurrent.Promise
    import java.util.concurrent.TimeoutException
    
    object TimeoutScheduler{
      val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
      def scheduleTimeout(promise:Promise[_], after:Duration) = {
        timer.newTimeout(new TimerTask{
          def run(timeout:Timeout){              
            promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
          }
        }, after.toNanos, TimeUnit.NANOSECONDS)
      }
    }
    

    然后是一个函数来获取Future并为其添加超时行为:

    import scala.concurrent.{Future, ExecutionContext, Promise}
    import scala.concurrent.duration.Duration
    
    def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
      val prom = Promise[T]()
      val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
      val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
      fut onComplete{case result => timeout.cancel()}
      combinedFut
    }
    

    请注意,我在这里使用的 HashedWheelTimer 来自Netty .

  • 3

    我刚为同事创建了一个 TimeoutFuture 类:

    TimeoutFuture

    package model
    
    import scala.concurrent._
    import scala.concurrent.duration._
    import play.libs.Akka
    import play.api.libs.concurrent.Execution.Implicits._
    
    object TimeoutFuture {
      def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
    
        val prom = promise[A]
    
        // timeout logic
        Akka.system.scheduler.scheduleOnce(timeout) {
          prom tryFailure new java.util.concurrent.TimeoutException
        }
    
        // business logic
        Future { 
          prom success block
        }
    
        prom.future
      } 
    }
    

    用法

    val future = TimeoutFuture(10 seconds) { 
      // do stuff here
    }
    
    future onComplete {
      case Success(stuff) => // use "stuff"
      case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
    }
    

    注意:

    • 假设玩!框架(但很容易适应)

    • 每段代码都在同一个 ExecutionContext 中运行,这可能并不理想 .

  • 2

    所有这些答案都需要额外的依赖性 . 我决定使用java.util.Timer编写一个版本,这是将来运行函数的有效方法,在这种情况下触发超时 .

    Blog post with more details here

    使用Scala的Promise,我们可以使用超时生成Future,如下所示:

    package justinhj.concurrency
    
    import java.util.concurrent.TimeoutException
    import java.util.{Timer, TimerTask}
    
    import scala.concurrent.duration.FiniteDuration
    import scala.concurrent.{ExecutionContext, Future, Promise}
    import scala.language.postfixOps
    
    object FutureUtil {
    
      // All Future's that use futureWithTimeout will use the same Timer object
      // it is thread safe and scales to thousands of active timers
      // The true parameter ensures that timeout timers are daemon threads and do not stop
      // the program from shutting down
    
      val timer: Timer = new Timer(true)
    
      /**
        * Returns the result of the provided future within the given time or a timeout exception, whichever is first
        * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
        * Thread.sleep would
        * @param future Caller passes a future to execute
        * @param timeout Time before we return a Timeout exception instead of future's outcome
        * @return Future[T]
        */
      def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {
    
        // Promise will be fulfilled with either the callers Future or the timer task if it times out
        val p = Promise[T]
    
        // and a Timer task to handle timing out
    
        val timerTask = new TimerTask() {
          def run() : Unit = {
                p.tryFailure(new TimeoutException())
            }
          }
    
        // Set the timeout to check in the future
        timer.schedule(timerTask, timeout.toMillis)
    
        future.map {
          a =>
            if(p.trySuccess(a)) {
              timerTask.cancel()
            }
        }
        .recover {
          case e: Exception =>
            if(p.tryFailure(e)) {
              timerTask.cancel()
            }
        }
    
        p.future
      }
    
    }
    
  • 1

    Play框架包含Promise.timeout,因此您可以编写如下代码

    private def get(): Future[Option[Boolean]] = {
      val timeoutFuture = Promise.timeout(None, Duration("1s"))
      val mayBeHaveData = Future{
        // do something
        Some(true)
      }
    
      // if timeout occurred then None will be result of method
      Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
    }
    
  • 63

    您可以在等待未来时指定超时:

    对于 scala.concurrent.Futureresult 方法允许您指定超时 .

    对于 scala.actors.FutureFutures.awaitAll 允许您指定超时 .

    我认为Future的执行没有内置超时 .

  • 21

    还没有人提到 akka-streams . 这些流有一个简单的completionTimeout方法,将其应用于单一源流就像Future一样 .

    但是,akka-stream也会取消,因此它实际上可以从运行中结束源,即它向源发出超时信号 .

  • 5

    如果您希望编写者(承诺持有者)成为控制超时逻辑的人,请按以下方式使用akka.pattern.after

    val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
    Future.firstCompletedOf(Seq(promiseRef.future, timeout))
    

    这样,如果您的承诺完成逻辑永远不会发生,那么您的调用者的未来仍将在某个时刻失败完成 .

  • 3

    我很惊讶这不是Scala的标准 . 我的版本很短,没有依赖关系

    import scala.concurrent.Future
    
    sealed class TimeoutException extends RuntimeException
    
    object FutureTimeout {
    
      import scala.concurrent.ExecutionContext.Implicits.global
    
      implicit class FutureTimeoutLike[T](f: Future[T]) {
        def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
          Thread.sleep(ms)
          throw new TimeoutException
        }))
    
        lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
      }
    
    }
    

    用法示例

    import FutureTimeout._
    Future { /* do smth */ } withTimeout
    
  • 14

    Monix Task 已超时support

    import monix.execution.Scheduler.Implicits.global
    import monix.eval._
    import scala.concurrent.duration._
    import scala.concurrent.TimeoutException
    
    val source = Task("Hello!").delayExecution(10.seconds)
    
    // Triggers error if the source does not complete in 3 seconds after runOnComplete
    val timedOut = source.timeout(3.seconds)
    
    timedOut.runOnComplete(r => println(r))
    //=> Failure(TimeoutException)
    
  • 0

    我正在使用这个版本(基于上面的Play示例),它使用Akka系统调度程序:

    object TimeoutFuture {
      def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = {
        implicit val executionContext = system.dispatcher
    
        val prom = Promise[A]
    
        // timeout logic
        system.scheduler.scheduleOnce(timeout) {
          prom tryFailure new java.util.concurrent.TimeoutException
        }
    
        // business logic
        Future {
          try {
            prom success block
          } catch {
            case t: Throwable => prom tryFailure t
          }
        }
    
        prom.future
      }
    }
    

相关问题