首页 文章

Monadic折叠与状态monad在恒定的空间(堆和堆栈)?

提问于
浏览
10

是否可以在常量堆栈和堆空间中在State monad中执行折叠?或者是一种不同的功能技术更适合我的问题?

接下来的部分将描述问题和激励用例 . 我正在使用Scala,但Haskell中的解决方案也是受欢迎的 .


折叠状态Monad填满堆

假设Scalaz 7.考虑一下州Monad的monadic折叠 . 为了避免堆栈溢出,我们将蹦蹦跳跳 .

import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline

type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor

type S = Int  // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad

type R = Int  // or some other monoid

val col: Iterable[R] = largeIterableofRs() // defined elsewhere

val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){ 
    (acc: R, x: R) => StateT[Trampoline, S, R] {
      s: S => Trampoline.done { 
        (s + 1, Monoid[R].append(acc, x))
      }
    }
} run 0 run

// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap.  Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.

对于大型集合 col ,这将填充堆 .

我相信在折叠过程中,会为集合中的每个值( x: R 参数)创建一个闭包(State mobit),填充堆 . 在执行 run 0 之前,这些都不能被评估,提供初始状态 .

可以避免这种O(n)堆使用吗?

更具体地说,是否可以在折叠之前提供初始状态,以便状态monad可以在每次绑定期间执行,而不是嵌套闭包以供以后评估?

或者可以构建折叠使得它在状态monad之后懒洋洋地执行 run ?这样,下一个 x: R 闭包只有在前一个被评估并且适合垃圾收集之后才会被创建 .

或者这种工作有更好的功能范例吗?


示例应用程序

但也许我正在使用错误的工具来完成工作 . 下面是一个示例用例的演变 . 我在这里走错路吗?

考虑reservoir sampling,即从一个太大而不适合记忆的集合中一次性挑选一个均匀的随机 k 项 . 在Scala中,这样的功能可能是

def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]

如果拉伸到 TraversableOnce 类型可以像这样使用

val tenRandomInts = (Int.Min to Int.Max) sample 10

sample 完成的工作基本上是 fold

def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
    col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}

但是, update 是有状态的;它取决于 n ,已见过的物品数量 . (它也取决于RNG,但为了简单起见,我认为它是全局的和有状态的 . 用于处理 n 的技术将会非常简单地延伸 . ) . 那么如何处理这种状态呢?

不纯的解决方案很简单,并且使用不断的堆栈和堆运行 .

/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
    var n = 0
    def apply(sample: Vector[A], x: A): Vector[A] = {
        n += 1
        algorithmR(k, n, acc, x)
    }
}

def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
        sample :+ x // must keep first k elements
    } else {
        val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
        if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
        else
            sample
    }
}

但是纯功能解决方案呢? update 必须将 n 作为附加参数,并将新值与更新的样本一起返回 . 我们可以在隐式状态中包括 n ,折叠累加器,例如,

(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2

但这掩盖了意图;我们只是打算积累样本矢量 . 这个问题似乎已经为国家monad和monadic左侧折叠做好了准备 . 让我们再试一次 .

我们将使用Scalaz 7,这些导入

import scalaz._
import Scalaz._
import scalaz.std.iterable_

并且在 Iterable[A] 上操作,因为Scalaz不支持 Traversable 的monadic折叠 .

sample 现已定义

// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {       
    type M[B] = State[Int, B]

    // foldLeftM is implemented using foldRight, which must reverse `col`, blowing
    // the heap for large `col`.  Ignore this issue for now.
    // foldLeftM could be implemented differently or we could switch to
    // foldRightM, implemented using foldLeft.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}

更新的地方

// update using State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
}

不幸的是,这会在大集合上砸堆栈 .

让我们蹦蹦跳跳吧 . sample 现在

// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
    import Free.Trampoline

    type TrampolinedState[S, B] = StateT[Trampoline, S, B]
    type M[B] = TrampolinedState[Int, B]

    // Same caveat about foldLeftM using foldRight and blowing the heap
    // applies here.  Ignore for now. This solution blows the heap anyway;
    // let's fix that issue first.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}

更新的地方

// update using trampolined State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
        n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
    }
}

这可以修复堆栈溢出,但仍会为非常大的集合(或非常小的堆)吹出堆 . 在折叠期间创建集合中每个值的一个匿名函数(我相信会关闭每个 x: A 参数),在蹦床运行之前消耗堆 . (FWIW,State版本也有这个问题;堆栈溢出只是首先出现较小的集合 . )

2 回答

  • 1

    使用 State 或任何类似的monad不是解决问题的好方法 . 使用 State 被谴责在大型集合上烧掉堆栈/堆 . 考虑从大集合构造的 x: State[A,B] 的值(例如通过折叠它) . 然后 x 可以在初始状态 A 的不同值上进行评估,从而产生不同的结果 . 所以 x 需要保留集合中包含的所有信息 . 在纯设置中, x 不能忘记一些不会破坏堆栈/堆的信息,因此计算的任何内容都会保留在内存中,直到释放整个monadic值,这只有在计算结果后才会发生 . 因此 x 的内存消耗与集合的大小成正比 .

    我相信这个问题的合适方法是使用函数 iteratees/pipes/conduits . 这个概念(在这三个名称下引用)被发明用于处理具有恒定存储器消耗的大量数据集合,并使用简单的组合器来描述这样的过程 .

    我试图使用Scalaz' Iteratees ,但似乎这部分尚未成熟,它就像 State 一样遭遇堆栈溢出(或者我可能没有正确使用它;代码可用here,如果有人感兴趣的话) .

    但是,使用我的(仍然有点实验性)scala-conduit库( disclaimer: 我是作者)很简单:

    import conduit._
    import conduit.Pipe._
    
    object Run extends App {
      // Define a sampling function as a sink: It consumes
      // data of type `A` and produces a vector of samples.
      def sampleI[A](k: Int): Sink[A, Vector[A]] =
        sampleI[A](k, 0, Vector())
    
      // Create a sampling sink with a given state. It requests
      // a value from the upstream conduit. If there is one,
      // update the state and continue (the first argument to `requestF`).
      // If not, return the current sample (the second argument).
      // The `Finalizer` part isn't important for our problem.
      private def sampleI[A](k: Int, n: Int, sample: Vector[A]):
                      Sink[A, Vector[A]] =
        requestF((x: A) => sampleI(k, n + 1, algorithmR(k, n + 1, sample, x)),
                 (_: Any) => sample)(Finalizer.empty)
    
    
      // The sampling algorithm copied from the question.
      val rand = new scala.util.Random()
    
      def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = {
        if (sample.size < k) {
          sample :+ x // must keep first k elements
        } else {
          val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
          if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
          else
            sample
        }
      }
    
      // Construct an iterable of all `short` values, pipe it into our sampling
      // funcition, and run the combined pipe.
      {
        print(runPipe(Util.fromIterable(Short.MinValue to Short.MaxValue) >->
              sampleI(10)))
      }
    }
    

    Update: 使用 State 可以解决问题,但是我们需要专门为 State 实现自定义折叠,它知道如何做恒定空间:

    import scala.collection._
    import scala.language.higherKinds
    import scalaz._
    import Scalaz._
    import scalaz.std.iterable._
    
    object Run extends App {
      // Folds in a state monad over a foldable
      def stateFold[F[_],E,S,A](xs: F[E],
                                f: (A, E) => State[S,A],
                                z: A)(implicit F: Foldable[F]): State[S,A] =
        State[S,A]((s: S) => F.foldLeft[E,(S,A)](xs, (s, z))((p, x) => f(p._2, x)(p._1)))
    
    
      // Sample a lazy collection view
      def sampleS[F[_],A](k: Int, xs: F[A])(implicit F: Foldable[F]):
                      State[Int,Vector[A]] =
        stateFold[F,A,Int,Vector[A]](xs, update(k), Vector())
    
      // update using State monad
      def update[A](k: Int) = {
        (acc: Vector[A], x: A) => State[Int, Vector[A]] {
            n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
        }
      }
    
      def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = ...
    
      {
        print(sampleS(10, (Short.MinValue to Short.MaxValue)).eval(0))
      }
    }
    
  • 6

    我们真正的问题是未经执行的国家动员使用的堆 .

    不它不是 . 真正的问题是该集合不适合内存, foldLeftMfoldRightM 强制整个集合 . 不纯的解决方案的副作用是你可以随时释放内存 . 在"purely functional"解决方案中,你不是在任何地方都这样做 .

    您对 Iterable 的使用忽略了一个至关重要的细节:实际上是什么样的集合 col ,它的元素是如何创建的以及它们如何被丢弃 . 所以,必然会 Iterable Iterable . 它可能过于严格,你正在强迫整个集合进入内存 . 例如,如果它是 Stream ,那么只要你坚持 col ,到目前为止所有强制的元素都将在内存中 . 如果它是一些其他类型的懒惰 Iterable ,它不会记住它的元素,那么折叠仍然太严格 .

    我尝试了你的第一个例子 EphemeralStream 没有看到任何明显的堆压力,即使它显然会有相同的"unexecuted State mobits" . 区别在于 EphemeralStream 的元素被弱引用,而 foldRight 不会强制整个流 .

    我怀疑如果你使用了 Foldable.foldr ,那么你就不会看到有问题的行为,因为它在第二个参数中使用了一个懒惰的函数进行折叠 . 当您调用折叠时,您希望它立即返回看起来像这样的悬架:

    Suspend(() => head |+| tail.foldRightM(...))
    

    当蹦床恢复第一次暂停并运行到下一次暂停时,悬架之间的所有分配将可由垃圾收集器释放 .

    请尝试以下方法:

    def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
      if (bs.isEmpty) Monad[M].point(a)
      else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))
    
    val MS = StateT.stateTMonadState[Int, Trampoline]
    import MS._
    
    foldM[M,R,Int](Monoid[R].zero, col) {
      (x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
    } run 0 run
    

    这将在一个trampolined monad M 的常量堆中运行,但是会为非trampolined monad溢出堆栈 .

    the real problem is that Iterable is not a good abstraction for data that are too large to fit in memory. 当然,您可以编写一个命令式的副作用程序,在每次迭代后显式丢弃元素或使用惰性右折叠 . 这很有效,直到你想用另一个程序组成该程序 . 而且我开始调查在一个 State monad开始这样做是为了获得组合性 .

    所以,你可以做什么?以下是一些选项:

    • 利用 ReducerMonoid 及其组合,然后在命令式显式释放循环(或蹦床的懒惰右侧折叠)中作为最后一步运行,之后组合是不可能的或预期的 .

    • 使用 Iteratee 组合和monadic Enumerator 来提供它们 .

    • Scalaz-Stream写出合成流传感器 .

    这些选项中的最后一个是我将在一般情况下使用和推荐的选项 .

相关问题