首页 文章

Scala的Future和ExecutionContext执行

提问于
浏览
1

假设我有以下一组代码在未来做了一些事情:

1 to 10 foreach {
  case x => Future { x + x }
}

假设我给这段代码提供了默认的ExecutionContext,我知道后台会发生什么,但我想知道的是如何实现Future的处理呢?我的意思是应该有一些线程或一组线程可能等待Future完成?这些线程被阻止了吗?在他们真正等待未来完成的意义上阻止?

现在在以下场景中:

val x: Future[MyType] = finishInSomeFuture()

假设x有一个超时,我可以像这样调用:

Future {
  blocking {
    x.get(3, TimeOut.SECONDS)
  }
}

我真的在阻止吗?是否有更好的异步超时方法?

编辑:下面的Timeout比我上面定义的阻塞上下文有多么不同或有多好?

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}

1 回答

  • 6
    • 假设我有以下一组代码在未来做了一些事情:1到10个foreach {
      case x =>未来

      }
      ...

    您的一段代码会创建十个 Futures ,它们会立即使用隐式ExecutionContext提供的线程设置执行 . 当您等待执行时,您的主线程(定义了 foreach )不会阻止并立即继续执行 . 如果那段代码在 main 方法的末尾,那么,取决于 ThreadFactoryExecutionContext 生成的daemon threads程序是否可以退出而无需等待Futures完成 .

    • 现在在以下场景中:val x:Future [MyType] = finishInSomeFuture()
      假设x有一个超时,我可以像这样调用:Future {
      阻止{
      x.get(3,TimeOut.SECONDS)
      }
      }
      我真的在阻止吗?是否有更好的异步超时方法?

    你可能意味着 Await.result 而不是 x.get

    def inefficientTimeoutFuture[T](f:Future[T], x:Duration) = Future { Await.result(f, x) }
    

    在这种情况下,未来 f 将在单独的线程中计算,而其他线程将被阻止等待 f 的计算 .

    Using scheduler to create TimeoutFuture更有效,因为调度程序通常共享固定数量的线程(通常是一个),而 Await.result 中的阻塞总是需要额外的线程来阻止 .

    • 我想知道如何在不阻塞的情况下超时?

    使用调度程序创建TimeoutFuture允许您在不阻塞的情况下超时运行 . 你正在将你的Future包装在超时助手中,新的Future要么成功完成,要么因超时而失败(无论什么是第一次) . 新的Future具有相同的异步性质,由您决定如何使用它(注册onComplete回调或同步等待结果,阻塞主线程) .


    UPD 我将尝试澄清有关多线程和阻塞的一些基本问题 .

    现在,异步非阻塞方法是趋势,但您必须了解阻塞意味着什么以及为什么应该避免它 .

    Java中的每个线程都需要付出代价 . 首先,创建新线程(这就是存在线程池的原因)相对昂贵,其次,它会消耗内存 . 为什么不用CPU?因为您的CPU资源受到您拥有的核心数量的限制 . 无论您拥有多少活动线程,您的并行度级别始终都会受到核心数量的限制 . 如果线程处于非活动状态(被阻塞),则它不会占用CPU .

    在当代Java应用程序中,您可以创建相当多的线程(数千个) . 问题是,在某些情况下,您无法预测您将需要多少线程 . 这就是异步方法发挥作用的时候 . 它说:而不是阻止当前线程,而其他一些线程做他们的工作让我们在回调中包装我们的后续步骤并将当前线程返回到池,因此它可以做一些其他有用的工作 . 因此几乎所有线程都在忙于实际工作而不是仅仅等待和消耗内存 .

    现在以计时器为例 . 如果您使用基于netty的 HashedWheelTimer ,您可以使用单线程支持它,并安排数千个事件 . 当您创建被阻止等待超时的 Future 时,每个"schedule"占用一个线程 . 因此,如果您计划了数千次超时,那么最终会有数千个被阻塞的线程(这会消耗内存,而不是cpu) .

    现在你的"main" future(你想要在超时中换行)也不必阻塞线程 . 例如,如果您在将来执行同步http请求,您的线程将被阻止,但如果您使用基于netty的 AsyncHttpClient (例如),则可以使用不占用该线程的基于承诺的未来 . 在这种情况下,您可以使用少量固定数量的线程来处理任意数量的请求(数十万个) .


    UPD2

    但是应该有一些线程应该阻塞,即使在Timer的情况下,因为它必须等待Timeout millis . 那么益处是什么?我仍然阻止,但可能是我在Timer情况下阻止或?

    这仅适用于一种特定情况:当您拥有等待异步任务的主线程时完成 . 在这种情况下你是对的,没有阻塞主线程就无法在超时中包装操作 . 在这种情况下使用Timers没有任何意义 . 你只需要额外的线程来执行你的操作,而主线程等待结果或超时 .

    但通常 Futures 用于更复杂的场景,其中没有"main"线程 . 例如,假设异步Web服务器,请求进来,您创建Future来处理它并注册回调来回复 . 没有"main"线程等待任何事情 .

    或者另一个示例,您希望向具有单独超时的外部服务发出1000个请求,然后将所有结果收集到一个位置 . 如果您拥有该服务的异步客户端,则创建1000个请求,将它们包装在异步超时中,然后合并为一个Future . 您可以阻止主线程等待该未来完成或注册回调以打印结果,但您不必创建1000个线程只是为了等待每个单独的请求完成 .

    所以,关键是:如果你已经有了同步流,并且想要在超时中包含它的某些部分,那么你唯一能做的就是阻止当前线程,直到其他线程执行该工作 . 如果要避免阻塞,则需要从一开始就使用异步方法 .

相关问题