首页 文章

如何编写一个管道,向下游发送从上游接收的内容列表?

提问于
浏览
4

我很难用这个签名写一个 pipe

toOneBigList :: (Monad m, Proxy p) => () -> Pipe p a [a] m r

它应该简单地从上游获取所有 a 并将它们发送到下游列表中 .

我的所有尝试都从根本上被打破了 .

任何人都能指出我正确的方向吗?

2 回答

  • 9

    有两个基于_1693560的解决方案,我会让你选择你喜欢的解决方案 .

    注意:目前尚不清楚为什么在下游接口上输出列表而不是直接返回它 .

    管道式

    第一个非常接近基于 conduit 的解决方案使用即将推出的 pipes-pase ,它基本上是完整的,只需要文档 . 你可以在Github上找到latest draft .

    使用 pipes-parse ,解决方案与Petr给出的 conduit 解决方案完全相同:

    import Control.Proxy
    import Control.Proxy.Parse
    
    combine
        :: (Monad m, Proxy p)
        => () -> Pipe (StateP [Maybe a] p) (Maybe a) [a] m ()
    combine () = loop []
      where
        loop as = do
            ma <- draw
            case ma of
                Nothing -> respond (reverse as)
                Just a  -> loop (a:as)
    

    draw 类似于 conduitawait :它从剩余缓冲区(即 StateP 部分)请求一个值,如果缓冲区为空则从上游请求一个值 . Nothing 表示文件结束 .

    您可以使用 pipes-parse 中的 wrap 函数来包装没有文件结束信号的管道,该函数的类型为:

    wrap :: (Monad m, Proxy p) => p a' a b' b m r -> p a' a b' (Maybe b) m s
    

    经典管道样式

    第二种选择稍微简单一些 . 如果要折叠给定的管道,可以使用 WriterP 直接进行折叠:

    import Control.Proxy
    import Control.Proxy.Trans.Writer
    
    foldIt
      :: (Monad m, Proxy p) =>
         (() -> Pipe p a b m ()) -> () -> Pipe p a [b] m ()
    foldIt p () = runIdentityP $ do
        r <- execWriterK (liftP . p >-> toListD >-> unitU) ()
        respond r
    

    这是对正在发生的事情的更高级别的描述,但它需要传入管道作为显式参数 . 这取决于你喜欢哪一个 .

    顺便说一句,这就是为什么我问你为什么要向下游发送一个值 . 如果您返回折叠列表,上面的内容会简单得多:

    foldIt p = execWriterK (liftP . p >-> toListD)
    

    如果 p 在其代理类型中完全是多态的,则甚至可能不需要 liftP . 我只是将其作为预防措施 .

    奖金解决方案

    pipes-parse 没有提供 toOneBigList 的原因是它始终是一个管道反模式,将结果分组到一个列表中 . pipes 有几个不错的功能,即使您尝试生成多个列表,也可以永远不必将输入分组到列表中 . 例如,使用 respond 组合,您可以让代理生成它将遍历的流的子集,然后注入使用该子集的处理程序:

    example :: (Monad m, Proxy p) => () -> Pipe p a (() -> Pipe p a a m ()) m r
    example () = runIdentityP $ forever $ do
        respond $ \() -> runIdentityP $ replicateM_ 3 $ request () >>= respond
    
    printIt :: (Proxy p, Show a) => () -> Pipe p a a IO r
    printIt () = runIdentityP $ do
        lift $ putStrLn "Here we go!"
        printD ()
    
    useIt :: (Proxy p, Show a) => () -> Pipe p a a IO r
    useIt = example />/ (\p -> (p >-> printIt) ())
    

    以下是如何使用它的示例:

    >>> runProxy $ enumFromToS 1 10 >-> useIt
    Here we go!
    1
    2
    3
    Here we go!
    4
    5
    6
    Here we go!
    7
    8
    9
    Here we go!
    10
    

    这意味着即使需要对元素进行分组,也不需要将单个元素带入内存 .

  • 2

    我只给出一个部分答案,也许其他人会有更好的答案 .

    据我所知,标准管道没有检测管道其他部分何时终止的机制 . 终止的第一个管道产生管道的最终结果,而其他所有管道都被丢弃 . 因此,如果你有一个永远消耗输入的管道(最终产生一个列表),那么当它的上游完成时,它就没有机会动作并产生输出 . (这是故意的,因此上游和下游部分彼此是双重的 . )也许这在管道顶部的某些图书馆建筑中得到了解决 .

    情况与conduit不同 . 它具有consume功能,它将所有输入组合成一个列表并返回(而不是输出)它 . 编写一个类似于你需要的函数,最后输出列表并不困难:

    import Data.Conduit
    
    combine :: (Monad m) => Conduit a m [a]
    combine = loop []
      where
        loop xs = await >>= maybe (yield $ reverse xs) (loop . (: xs))
    

相关问题