首页 文章

使用返回未来的函数遍历列表和流

提问于
浏览
29

简介

Scala的 Futurenew in 2.10now 2.9.3)是一个应用函子,这意味着如果我们有traversable type F ,我们可以将 F[A] 和函数 A => Future[B] 转换为 Future[F[B]] .

此操作在标准库中可用作Future.traverse . Scalaz 7还提供了一个更通用的 traverse ,如果我们从scalaz-contrib library导入 Future 的applicative functor实例,我们可以在这里使用 .

这两种方法在流的情况下表现不同 . 标准库遍历在返回之前使用流,而Scalaz的returns the future immediately

import scala.concurrent._
import ExecutionContext.Implicits.global

// Hangs.
val standardRes = Future.traverse(Stream.from(1))(future(_))

// Returns immediately.
val scalazRes = Stream.from(1).traverse(future(_))

还有另一个区别,因为Leif Warner观察到here . 标准库的 traverse 立即启动所有异步操作,而Scalaz启动第一个,等待它完成,启动第二个,等待它,依此类推 .

流的不同行为

通过编写一个将为流中的第一个值休眠几秒钟的函数来显示第二个差异非常容易:

def howLong(i: Int) = if (i == 1) 10000 else 0

import scalaz._, Scalaz._
import scalaz.contrib.std._

def toFuture(i: Int)(implicit ec: ExecutionContext) = future {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

现在 Future.traverse(Stream(1, 2))(toFuture) 将打印以下内容:

Starting 1!
Starting 2!
Done 2!
Done 1!

和Scalaz版本( Stream(1, 2).traverse(toFuture) ):

Starting 1!
Done 1!
Starting 2!
Done 2!

这可能不是我们想要的 .

和列表?

奇怪的是,这两个遍历在这方面的表现相同 - Scalaz不会等待一个未来在开始下一个之前完成 .

另一个未来

Scalaz还包含自己的concurrent包,其中包含自己的期货实现 . 我们可以使用与上面相同的设置:

import scalaz.concurrent.{ Future => FutureZ, _ }

def toFutureZ(i: Int) = FutureZ {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

然后我们在列表和流的流上获得Scalaz的行为:

Starting 1!
Done 1!
Starting 2!
Done 2!

也许不那么令人惊讶的是,遍历无限流仍然会立即返回 .

问题

此时我们确实需要一个表来总结,但列表必须要做:

  • 标准库遍历的流:返回前消耗;不要等待每一个未来 .

  • 使用Scalaz遍历的流:立即返回;等待每个未来完成 .

  • Scalaz期货与流:立即返回;等待每个未来完成 .

和:

  • 标准库遍历列表:不要等待 .

  • Scalaz遍历列表:不要等待 .

  • 带有列表的Scalaz期货:等待每个未来完成 .

这有意义吗?列表和流上的此操作是否存在“正确”行为?是否存在某种原因导致“最异步”的行为 - 即,在返回之前不消耗集合,并且不等待每个未来在继续下一个之前完成 - 这里没有表示?

2 回答

  • 0

    我不能回答所有问题,但我尝试了一些部分:

    是否有某种原因导致“最异步”的行为 - 即,在返回之前不消耗集合,并且不等待每个未来完成再继续下一个 - 这里没有表示?

    如果您有依赖计算和有限数量的线程,则可能会遇到死锁 . 例如,你有两个期货,取决于第三个(期货列表中的所有三个)和只有两个线程,你可以遇到前两个期货阻止所有两个线程而第三个永远不会被执行的情况 . (当然,如果您的池大小为1,即zou执行一个计算,则可以得到类似的情况)

    要解决这个问题,每个未来都需要一个线程,没有任何限制 . 这适用于小型期货清单,但不适用于大型期货 . 因此,如果你并行运行,你将会遇到这样的情况:小例子将在所有情况下运行,而较大的例子将会死锁 . (例如:开发人员测试运行正常, 生产环境 死锁) .

    列表和流上的此操作是否存在“正确”行为?

    我认为未来是不可能的 . 如果你知道更多的依赖项,或者当你确定计算不会阻塞时,可能会有更多的并发解决方案 . 但是执行期货清单会让我“破坏设计” . 最好的解决方案似乎是一个,对于死锁的小例子来说已经失败了(即一个接一个地执行一个Future) .

    带有列表的Scalaz期货:等待每个未来完成 .

    我认为scalaz在内部使用for comprehension进行遍历 . 为了理解,不能保证计算是独立的 . 所以我猜Scalaz在这里做的正确为了理解:一个接一个地进行计算 . 在期货的情况下,这将始终有效,因为您的操作系统中有无限的线程 .

    换句话说:你只看到一个关于理解(必须)如何运作的神器 .

    我希望这是有道理的 .

  • 0

    如果我正确理解了这个问题,我认为这实际上归结为流与列表的语义 .

    遍历列表可以实现我们对文档的期望:

    使用提供的函数A => Future [B]将TraversableOnce [A]转换为Future [TraversableOnce [B]] . 这对于执行并行映射非常有用 . 例如,要将函数并行应用于列表的所有项:

    使用流,由开发人员决定他们希望它如何工作,因为它依赖于比编译器更多的流知识(流可以是无限的,但类型系统不知道它) . 如果我的流正在从文件中读取行,我想首先使用它,因为逐行链接期货实际上不会并行化 . 在这种情况下,我想要并行方法 .

    另一方面,如果我的流是一个无限列表,生成顺序整数并且搜索大于某个大数的第一个素数,则不可能在一次扫描中首先使用流(将需要链接的方法,我们'可能想要从流中批量运行) .

    而不是试图找出一种规范的方法来处理这个问题,我想知道是否有缺少的类型可以帮助使不同的案例更加明确 .

相关问题