是否可以在常量堆栈和堆空间中在State monad中执行折叠?或者是一种不同的功能技术更适合我的问题?
接下来的部分将描述问题和激励用例 . 我正在使用Scala,但Haskell中的解决方案也是受欢迎的 .
折叠状态Monad填满堆
假设Scalaz 7.考虑一下州Monad的monadic折叠 . 为了避免堆栈溢出,我们将蹦蹦跳跳 .
import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor
type S = Int // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad
type R = Int // or some other monoid
val col: Iterable[R] = largeIterableofRs() // defined elsewhere
val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){
(acc: R, x: R) => StateT[Trampoline, S, R] {
s: S => Trampoline.done {
(s + 1, Monoid[R].append(acc, x))
}
}
} run 0 run
// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap. Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.
对于大型集合 col
,这将填充堆 .
我相信在折叠过程中,会为集合中的每个值( x: R
参数)创建一个闭包(State mobit),填充堆 . 在执行 run 0
之前,这些都不能被评估,提供初始状态 .
可以避免这种O(n)堆使用吗?
更具体地说,是否可以在折叠之前提供初始状态,以便状态monad可以在每次绑定期间执行,而不是嵌套闭包以供以后评估?
或者可以构建折叠使得它在状态monad之后懒洋洋地执行 run
?这样,下一个 x: R
闭包只有在前一个被评估并且适合垃圾收集之后才会被创建 .
或者这种工作有更好的功能范例吗?
示例应用程序
但也许我正在使用错误的工具来完成工作 . 下面是一个示例用例的演变 . 我在这里走错路吗?
考虑reservoir sampling,即从一个太大而不适合记忆的集合中一次性挑选一个均匀的随机 k
项 . 在Scala中,这样的功能可能是
def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]
如果拉伸到 TraversableOnce
类型可以像这样使用
val tenRandomInts = (Int.Min to Int.Max) sample 10
sample
完成的工作基本上是 fold
:
def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}
但是, update
是有状态的;它取决于 n
,已见过的物品数量 . (它也取决于RNG,但为了简单起见,我认为它是全局的和有状态的 . 用于处理 n
的技术将会非常简单地延伸 . ) . 那么如何处理这种状态呢?
不纯的解决方案很简单,并且使用不断的堆栈和堆运行 .
/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
var n = 0
def apply(sample: Vector[A], x: A): Vector[A] = {
n += 1
algorithmR(k, n, acc, x)
}
}
def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
if (sample.size < k) {
sample :+ x // must keep first k elements
} else {
val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
if (r <= k)
sample.updated(r - 1, x) // sample is 0-index
else
sample
}
}
但是纯功能解决方案呢? update
必须将 n
作为附加参数,并将新值与更新的样本一起返回 . 我们可以在隐式状态中包括 n
,折叠累加器,例如,
(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2
但这掩盖了意图;我们只是打算积累样本矢量 . 这个问题似乎已经为国家monad和monadic左侧折叠做好了准备 . 让我们再试一次 .
我们将使用Scalaz 7,这些导入
import scalaz._
import Scalaz._
import scalaz.std.iterable_
并且在 Iterable[A]
上操作,因为Scalaz不支持 Traversable
的monadic折叠 .
sample
现已定义
// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
type M[B] = State[Int, B]
// foldLeftM is implemented using foldRight, which must reverse `col`, blowing
// the heap for large `col`. Ignore this issue for now.
// foldLeftM could be implemented differently or we could switch to
// foldRightM, implemented using foldLeft.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}
更新的地方
// update using State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => State[Int, Vector[A]] {
n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
}
}
不幸的是,这会在大集合上砸堆栈 .
让我们蹦蹦跳跳吧 . sample
现在
// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B]
type M[B] = TrampolinedState[Int, B]
// Same caveat about foldLeftM using foldRight and blowing the heap
// applies here. Ignore for now. This solution blows the heap anyway;
// let's fix that issue first.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}
更新的地方
// update using trampolined State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
}
}
这可以修复堆栈溢出,但仍会为非常大的集合(或非常小的堆)吹出堆 . 在折叠期间创建集合中每个值的一个匿名函数(我相信会关闭每个 x: A
参数),在蹦床运行之前消耗堆 . (FWIW,State版本也有这个问题;堆栈溢出只是首先出现较小的集合 . )
2 回答
使用
State
或任何类似的monad不是解决问题的好方法 . 使用State
被谴责在大型集合上烧掉堆栈/堆 . 考虑从大集合构造的x: State[A,B]
的值(例如通过折叠它) . 然后x
可以在初始状态A
的不同值上进行评估,从而产生不同的结果 . 所以x
需要保留集合中包含的所有信息 . 在纯设置中,x
不能忘记一些不会破坏堆栈/堆的信息,因此计算的任何内容都会保留在内存中,直到释放整个monadic值,这只有在计算结果后才会发生 . 因此x
的内存消耗与集合的大小成正比 .我相信这个问题的合适方法是使用函数 iteratees/pipes/conduits . 这个概念(在这三个名称下引用)被发明用于处理具有恒定存储器消耗的大量数据集合,并使用简单的组合器来描述这样的过程 .
我试图使用Scalaz'
Iteratees
,但似乎这部分尚未成熟,它就像State
一样遭遇堆栈溢出(或者我可能没有正确使用它;代码可用here,如果有人感兴趣的话) .但是,使用我的(仍然有点实验性)scala-conduit库( disclaimer: 我是作者)很简单:
Update: 使用
State
可以解决问题,但是我们需要专门为State
实现自定义折叠,它知道如何做恒定空间:不它不是 . 真正的问题是该集合不适合内存,
foldLeftM
和foldRightM
强制整个集合 . 不纯的解决方案的副作用是你可以随时释放内存 . 在"purely functional"解决方案中,你不是在任何地方都这样做 .您对
Iterable
的使用忽略了一个至关重要的细节:实际上是什么样的集合col
,它的元素是如何创建的以及它们如何被丢弃 . 所以,必然会Iterable
Iterable
. 它可能过于严格,你正在强迫整个集合进入内存 . 例如,如果它是Stream
,那么只要你坚持col
,到目前为止所有强制的元素都将在内存中 . 如果它是一些其他类型的懒惰Iterable
,它不会记住它的元素,那么折叠仍然太严格 .我尝试了你的第一个例子
EphemeralStream
没有看到任何明显的堆压力,即使它显然会有相同的"unexecuted State mobits" . 区别在于EphemeralStream
的元素被弱引用,而foldRight
不会强制整个流 .我怀疑如果你使用了
Foldable.foldr
,那么你就不会看到有问题的行为,因为它在第二个参数中使用了一个懒惰的函数进行折叠 . 当您调用折叠时,您希望它立即返回看起来像这样的悬架:当蹦床恢复第一次暂停并运行到下一次暂停时,悬架之间的所有分配将可由垃圾收集器释放 .
请尝试以下方法:
这将在一个trampolined monad
M
的常量堆中运行,但是会为非trampolined monad溢出堆栈 .但 the real problem is that Iterable is not a good abstraction for data that are too large to fit in memory. 当然,您可以编写一个命令式的副作用程序,在每次迭代后显式丢弃元素或使用惰性右折叠 . 这很有效,直到你想用另一个程序组成该程序 . 而且我开始调查在一个
State
monad开始这样做是为了获得组合性 .所以,你可以做什么?以下是一些选项:
利用
Reducer
,Monoid
及其组合,然后在命令式显式释放循环(或蹦床的懒惰右侧折叠)中作为最后一步运行,之后组合是不可能的或预期的 .使用
Iteratee
组合和monadicEnumerator
来提供它们 .用Scalaz-Stream写出合成流传感器 .
这些选项中的最后一个是我将在一般情况下使用和推荐的选项 .