首页 文章

流变换器Monad为Vector.Stream

提问于
浏览
1

Data.Vector.Stream提供了一个很好的 Stream 实现,由于专注于可熔性而非常有效(更多信息见this paper) . 自 vector-0.1 以来,通过将Step类型移动到monad中,此 Stream 实现略有改变 . (现在,实现位于Data.Vector.Fusion.Stream.Monadic . )

简而言之,这是 Stream 的定义:

data Step s a where
    Yield :: a -> s -> Step s a
    Skip  :: s -> Step s a
    Done  :: Step s a

data Stream a = forall s. Stream (s -> Step s a) s

Step s a 使用更新函数 s -> Step s a 封装了状态 s 的一次迭代的三个可能结果 . 流是 Done ,或跳过输出,或产生输出 . (上面的定义使用GADT,但这不相关,这里 . )

这个 Stream 的简单应用是:

empty :: Stream a
empty = Stream (const Done) ()

singleton :: a -> Stream a
singleton x = Stream step True where
    step True  = Yield x False
    step False = Done

fromList :: [a] -> Stream a
fromList zs = Stream step zs
where
    step (x:xs) = Yield x xs
    step []     = Done

严格左折是这样做的:

foldl'S :: (a -> b -> a) -> a -> Stream b -> a
foldl'S f a0 (Stream step s) = go a0 s where
    go a s = a `seq`
                case step s of
                    Yield b s' -> go (f a b) s'
                    Skip    s' -> go a       s'
                    Done       -> a

这给出了通常的列表式函数 lengthS = foldl'S (\n _ -> n+1) 0 等 . 这肯定不如ConduitPipes那么优雅,但它简单而快速 . 到现在为止还挺好 .

现在,让's try to aggregate '低级'流向更高级别的流 . 例如,如果您有一个比特流 Stream Bool ,您可能希望通过使用一些聪明的编解码器来解码这些位以产生 Stream Int . 当然,始终可以从给定 Stream step s 中提取的 step 函数构建新的步骤函数 s -> Step s b . 重复应用 step :: s->Step s a 函数导致笨拙的 case (step s) of ... 级联,一次又一次地处理三种可能性 DoneSkipYield . 理想情况下, aggregate 应该是这样的:

aggregate :: Stream a -> M?? -> Stream b
newStream = aggregate oldStream $ do
    a1 <- get    -- a1 :: a
    if a1 == True then doSomething
    else do
        a2 <- get
        -- etc.

M?? 是一些monad,需要定义 . 我们试试 Appl s a 类型:

newtype Appl s a = Appl ((s->Step s a) -> s -> Step s a)

它被称为 Appl ,因为它具有函数应用程序的签名 . monad实例非常简单:

instance Monad (Appl s) where
    return a = Appl (\_ s -> Yield a s)
    (Appl ap) >>= f = Appl (\step s ->
        case (ap step s) of
            Done -> Done
            Skip s' -> untilNotSkip step s'
            Yield a' s' -> ap' step s' where Appl ap' = f a'

其中 untilNotSkip :: (s->Step s a) -> s -> Step s a 只是重复(嵌套) step :: (s->Step s a) 的应用程序,直到返回 DoneYield .

get 函数只是普通的函数应用程序

get :: Appl s a
get = Appl(\step s -> step s)

为了解决问题,需要完成 FunctorApplicative ,问题出现了: Appl s 无法成为仿函数 . 签名是

fmap :: (a->b) -> Appl s a -> Appl s b

这只是不起作用,因为为了从函数 (s->Step s a) -> s -> Step s a) 创建函数 (s->Step s b) -> s -> Step s b) 我需要一个 b->a . 我可以通过 a->b 拉回 Appl s b ,但是我不能推进 Appl s a - 即我可以有一个逆变函子而不是一个算子 . 那真是怪了 . Streams are quite naturally comonads,但我看不到连接 . Appl 的目的是将步骤函数 s->Step s a 转换为另一个 s->Step s b .

这里出了点问题, Appl 不是正确的“ M?? ” . 任何人都可以帮忙吗?

Update

正如李尧瑶所指出的那种类型应该是类似的

data Walk a b = forall s. Walk ((s->Step s a) -> s -> Step s b)

而Functor,Applicative和Monad实例将是

instance Functor (Step s) where
    fmap f Done        = Done
    fmap f (Skip s)    = Skip s
    fmap f (Yield a s) = Yield (f a) s

instance Functor (Walk a) where
    fmap f (Walk t) = Walk ( \step s -> fmap f (t step s) )

-- default Applicative given a monad instance
ap :: (Monad m) => m (a -> b) -> m a -> m b
ap mf m = do
    f <- mf
    x <- m
    return (f x)

untilNotSkip :: (s->Step s a) -> s -> Step s a
untilNotSkip step s = case step s of
    Done        -> Done
    Skip s'     -> untilNotSkip step s'
    Yield a' s' -> Yield a' s'

instance Monad (Walk a) where
    return a = Walk (\_ s -> Yield a s)
    Walk t >>= f =
        Walk (\step s -> case t (untilNotSkip step) s of
            Done        -> Done
            Skip _      -> error "Internal error."
            Yield b' s' -> case f b' of Walk t' -> t' step s'   -- bad
    )

instance Applicative (Walk a) where
    pure = return
    (<*>) = ap

但是,类型检查器不允许使用此monad实例 . 在 >>= 的定义中, Walk (\step s -> ... 中的 sYield b' s' -> ... 中的 s' 不同,但它必须相同 . 这里的根本问题是 (>>=) :: Walk a b -> (b->Walk a c) -> Walk a c 有两个独立的全量化状态 s ,一个在第一个参数中,另一个由 b->Walk a c 返回 . 实际上这是(滥用符号) (forall s. Walk s a b) -> (forall s'. b->Walk s' a' c) -> (forall s''. Walk s'' a c) ,这在概念上和类型检查器上都没有意义 . 所有三个 ss' ,_ s'' 必须是同一类型 .

Walk 未在 s 上全部量化的变体:

data Walk s a b = Walk ((s->Step s a) -> s -> Step s b)

允许正确定义bind,但是 aggregate 将不起作用:

-- does not compile
aggregate :: Stream a -> Walk s a b -> Stream b
aggregate (Stream step s) (M t) = Stream (t step) s

同样,流状态 s 必须始终相同 . 一种方法是引入 data PreStream s a = PreStream (s -> Step s a) s ,但这也不允许 aggregate :: Stream a -> ?? -> Stream b .

源代码在github上 .

1 回答

  • 2

    让我们再看看 Appl ,因为它看起来几乎是正确的 .

    newtype Appl s a = Appl ((s->Step s a) -> s -> Step s a)
    

    我们的想法是通过将"low-level"步骤函数转换为"high-level"来定义流传感器 . 根据该视图,这两个步骤函数不应该将位转换为字节,我们需要 (s -> Step s Bit) -> s -> Step s Byte .

    因此,更好的类型将是

    newtype Walk s b a = Walk ((s -> Step s b) -> s -> Step s a)
    -- A walk is many steps.
    

    此外,由于 Stream 存在于 s 之间的存在量,因此在某些时候需要对 s 进行一些通用量化以使用 Walk ,因此您可以将它放在类型定义中 .

    newtype Walk b a = Walk (forall s. (s -> Step s b) -> s -> Step s a)
    

相关问题