首页 文章

使用回调编写两个Scala期货,没有第三个ExecutionContext

提问于
浏览
0

我有两种方法,让我们称它们为 load()init() . 每个人都在自己的线程中开始计算,并在自己的执行上下文中返回 Future . 这两个计算是独立的 .

val loadContext = ExecutionContext.fromExecutor(...)
def load(): Future[Unit] = {
  Future
}

val initContext = ExecutionContext.fromExecutor(...)
def init(): Future[Unit] = {
  Future { ... }(initContext)
}

我想从第三个线程调用这两个 - 比如它来自 main() - 并在两个完成时执行其他一些计算 .

def onBothComplete(): Unit = ...

现在:

  • 我不在乎哪个先完成

  • 我不关心执行其他计算的线程,除了:

  • 我不想阻止任何一个线程等待另一个;

  • 我不想阻止第三个(调用)线程;和

  • 我不想只是为了设置标志而启动第四个线程 .

如果我使用for-comprehensions,我会得到类似的东西:

val loading = load()
val initialization = initialize()

for {
  loaded <- loading
  initialized <- initialization
} yield { onBothComplete() }

我得到 Cannot find an implicit ExecutionContext.

我认为这意味着Scala想要第四个线程等待两个期货的完成并设置标志,要么是一个明确的新 ExecutionContextExecutionContext.Implicits.global . 所以似乎理解力已经消失了 .

我以为我可以嵌套回调:

initialization.onComplete {
  case Success(_) =>
    loading.onComplete {
      case Success(_) => onBothComplete()
      case Failure(t) => log.error("Unable to load", t)
    }
  case Failure(t) => log.error("Unable to initialize", t)
}

不幸的是 onComplete 也采取了隐含的 ExecutionContext ,我得到了同样的错误 . (这也很丑陋,如果 initialization 失败,则会从 loading 丢失错误消息 . )

有没有办法在没有阻止和没有引入另一个 ExecutionContext 的情况下编写Scala Futures?如果没有,我可能不得不将它们抛弃为Java 8 CompletableFutures或Javaslang Vavr Futures,它们都能够在执行原始工作的线程上运行回调 .

Updated 澄清阻止任一线程等待另一个线程也是不可接受的 .

Updated again 对完成后计算不太具体 .

2 回答

  • 1

    为什么不重用一个自己的执行上下文?不确定你对它们的要求是什么,但是如果你使用单个线程 Actuator ,你可以只重用那个作为你的理解的执行上下文,你将不会创建任何新的线程:

    implicit val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
    

    如果您真的无法重用它们,您可以将其视为隐式执行上下文:

    implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
      (runnable: Runnable) => {
        runnable.run()
      })
    

    哪个将在当前线程上运行期货 . 但是,Scala文档明确建议不要这样做,因为它引入了非确定性,其中线程运行 Future (但正如您所说,您不关心它运行在哪个线程上,因此这可能无关紧要) .

    请参阅Synchronous Execution Context了解为什么这是不可取的 .

    该上下文的一个示例:

    val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
    
    def load(): Future[Unit] = {
      Future(println("loading thread " + Thread.currentThread().getName))(loadContext)
    }
    
    val initContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
    
    def init(): Future[Unit] = {
      Future(println("init thread " + Thread.currentThread().getName))(initContext)
    }
    
    val doneFlag = new AtomicBoolean(false)
    
    val loading = load()
    val initialization = init()
    
    implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
      (runnable: Runnable) => {
        runnable.run()
      })
    
    for {
      loaded <- loading
      initialized <- initialization
    } yield {
      println("yield thread " + Thread.currentThread().getName)
      doneFlag.set(true)
    }
    

    打印:

    loading thread pool-1-thread-1
    init thread pool-2-thread-1
    yield thread main
    

    虽然 yield 行可以打印 pool-1-thread-1pool-2-thread-1 ,具体取决于运行 .

  • 1

    在Scala中, Future 表示要执行异步(即与其他工作单元同时)的工作 . ExecutionContext 表示用于执行 Future 的线程池 . 换句话说, ExecutionContext 是执行实际工作的 Worker 团队 .

    为了提高效率和可扩展性,最好拥有大型团队(例如单个 ExecutionContext ,其中包含10个线程来执行10个266109_),而不是小团队(例如5个 ExecutionContext ,每个线程有2个线程执行10个 Future ) .

    在您的情况下,如果要将线程数限制为2,您可以:

    def load()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = {
      Future { ... } /* will use the teamOfWorkers implicitly */
    }
    
    def init()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = {
      Future { ... } /* will use the teamOfWorkers implicitly */
    }
    
    implicit val bigTeamOfWorkers = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
    /* All async works in the following will use 
       the same bigTeamOfWorkers implicitly and works will be shared by
       the 2 workers (i.e. thread) in the team  */
    for {
      loaded <- loading
      initialized <- initialization
    } yield doneFlag.set(true)
    

    Cannot find an implicit ExecutionContext 错误并不意味着Scala需要额外的线程 . 这只意味着Scala希望 ExecutionContext 能够完成这项工作 . 另外 ExecutionContext 并不一定意味着额外的'thread',例如以下 ExecutionContext ,而不是创建新线程,将在当前线程中执行工作:

    val currThreadExecutor = ExecutionContext.fromExecutor(new Executor {
      override def execute(command: Runnable): Unit = command.run()
    })
    

相关问题