我正在尝试从一个管道中读取一组最多50个项目,并在一次IO操作中处理它们 . (这个用例就是我试图将数据插入到数据库中,我想在一个事务中执行整个批处理,因为它的效率要高得多) . 这是我到目前为止的简化版本:
type ExampleType = Int
doSomething :: [ExampleType] -> IO ()
doSomething = undefined
inGroupsOf50 :: Monad m => Producer ExampleType m () -> m ()
inGroupsOf50 input =
runEffect $ input >-> loop
where loop = do entries <- replicateM 50 await
lift $ doSomething entries --Insert a bunch all in one transaction
loop
问题是我可以告诉的,除非要插入的项目数量除以50,否则我会错过一些 . 我真正想要的不是 replicateM 50 await
,如果输入结束,我可以提供多达50个或更少的项目,但我无法弄清楚如何编写它 .
我一直在想pipes-parse可能是正确的库 . draw
看起来有一个很有希望的签名......但到目前为止,所有的比特都没有融入我的脑海 . 我有一个 producer
,我正在写一个 consumer
而且我真的不知道这与 parser
的概念有什么关系 .
1 回答
甚至超过pipes-parse你很可能想看看pipes-group . 特别是,让我们检查一下这个功能
Lens'
位可能很可怕,但可以快速消除:它表明我们可以将Producer a m x
转换为FreeT (Producer a m) m x
[0]所以现在我们必须弄清楚如何处理
FreeT
位 . 特别是,我们想要深入研究free包并拉出函数iterT
这个函数
iterT
,让我们一次FreeT
FreeT
一个"step" . 为了理解这一点,我们首先将iterT
的类型专门用Producer a m
替换f
特别是,只要我们告诉它如何将
Producer a m (m x)
转换为m
-action,"run"就可以FreeT
充满Producer
. 这可能开始看起来更熟悉了 . 当我们定义runChunk
的第一个参数时,我们只需要执行一个Producer
,在这种情况下,它将只有选定数量的元素 .但是有效的返回值是什么?
m x
?这是"continuation",例如在当前版本之后出现的所有块我们都假设我们有Producer
的Char
并且我们想在3个字符之后打印和换行此时
_
孔在p :: Producer Char IO (IO ())
类型的上下文中具有IO ()
和p
类型 . 我们可以使用for
来使用此管道,收集它的返回类型(再次是延续),发出换行符,然后运行continuation .这表现完全符合要求
要清楚,虽然我做了一些阐述,但是一旦你看到所有部分如何组合在一起,这是相当简单的代码 . 这是整个列表:
[0]还有点多,但现在已足够了 .