首页 文章

Scala中的异步IO与期货

提问于
浏览
67

假设我从一些URL下载了一个(可能很大)的图像列表 . 我正在使用Scala,所以我要做的是:

import scala.actors.Futures._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val fimages: List[Future[...]] = urls.map (url => future { download url })

// Do something (display) when complete
fimages.foreach (_.foreach (display _))

我对Scala有点新意,所以这看起来对我来说仍然有些神奇:

  • 这是正确的方法吗?任何替代品,如果不是?

  • 如果我要下载100个图像,这会一次创建100个线程,还是会使用线程池?

  • 最后一条指令( display _ )是否会在主线程上执行,如果没有,我该如何确定它?

谢谢你的建议!

3 回答

  • 5

    在Scala 2.10中使用Futures . 它们是Scala团队,Akka团队和Twitter之间的联合工作,以实现更加标准化的未来API和跨框架使用的实现 . 我们刚刚发布了一份指南:http://docs.scala-lang.org/overviews/core/futures.html

    除了完全不阻塞(默认情况下,尽管我们提供了进行托管阻塞操作的能力)和可组合之外,Scala的2.10期货还带有一个隐式线程池来执行您的任务,以及一些管理超时的实用程序 .

    import scala.concurrent.{future, blocking, Future, Await, ExecutionContext.Implicits.global}
    import scala.concurrent.duration._
    
    // Retrieve URLs from somewhere
    val urls: List[String] = ...
    
    // Download image (blocking operation)
    val imagesFuts: List[Future[...]] = urls.map {
      url => future { blocking { download url } }
    }
    
    // Do something (display) when complete
    val futImages: Future[List[...]] = Future.sequence(imagesFuts)
    Await.result(futImages, 10 seconds).foreach(display)
    

    上面,我们首先导入一些东西:

    • future :用于创造未来的API .

    • blocking :用于托管阻止的API .

    • Future :未来的伴侣对象,其中包含许多有用的期货收集方法 .

    • Await :用于阻止未来的单例对象(将其结果传输到当前线程) .

    • ExecutionContext.Implicits.global :默认的全局线程池,一个ForkJoin池 .

    • duration._ :用于管理超时持续时间的实用程序 .

    imagesFuts 与您最初的做法大致相同 - 唯一的区别是我们使用托管阻止 blocking . 它通知线程池您传递给它的代码块包含长时间运行或阻塞操作 . 这允许池临时生成新工作程序,以确保永远不会发生所有工作程序被阻止 . 这样做是为了防止阻塞应用程序中的饥饿(锁定线程池) . 请注意,线程池还知道托管阻塞块中的代码何时完成 - 因此它将在该点删除备用工作线程,这意味着池将缩减回其预期大小 .

    (如果您想绝对阻止创建其他线程,那么您应该使用AsyncIO库,例如Java的NIO库 . )

    然后我们使用Future伴随对象的集合方法将 imagesFutsList[Future[...]] 转换为 Future[List[...]] .

    Await 对象是我们如何确保在调用线程上执行 display 的方式 - Await.result 只是强制当前线程等待,直到传递它的未来完成 . (这在内部使用托管阻止 . )

  • 127
    val all = Future.traverse(urls){ url =>
      val f = future(download url) /*(downloadContext)*/
      f.onComplete(display)(displayContext)
      f
    }
    Await.result(all, ...)
    
    • 在2.10中使用scala.concurrent.Future,现在是RC .

    • 使用隐式ExecutionContext

    • 新的Future doc明确指出onComplete(和foreach)可以在值可用时立即进行评估 . 老演员Future做同样的事情 . 根据您的要求显示,您可以提供合适的ExecutionContext(例如,单个线程执行程序) . 如果您只是希望主线程等待加载完成,则遍历会为您提供等待的未来 .

  • 3
    • 是的,对我来说似乎很好,但您可能想要研究更强大的twitter-utilAkka Future API(Scala 2.10将以这种方式创建一个新的Future库) .

    • 它使用线程池 .

    • 不,它不会 . 您需要使用GUI工具包的标准机制(Swing为 SwingUtilities.invokeLater 或SWT为 Display.asyncExec ) . 例如 .

    fimages.foreach (_.foreach(im => SwingUtilities.invokeLater(new Runnable { display im })))
    

相关问题