如何在Java 8 Stream上实现"partition"操作?通过分区,我的意思是,将流划分为给定大小的子流 . 不知何故,它将与Guava Iterators.partition()方法相同,只是它's desirable that the partitions are lazily-evaluated Streams rather than List' s .
如何在Java 8 Stream上实现"partition"操作?通过分区,我的意思是,将流划分为给定大小的子流 . 不知何故,它将与Guava Iterators.partition()方法相同,只是它's desirable that the partitions are lazily-evaluated Streams rather than List' s .
7 回答
这是一个纯Java解决方案,懒惰地评估而不是使用List .
该方法返回
Stream<List<T>>
以获得灵活性 . 您可以通过partition(something, 10).map(List::stream)
轻松将其转换为Stream<Stream<T>>
.我发现这个问题最优雅,最纯粹的java 8解决方案:
我认为内部存在某种黑客攻击是可能的:
为批处理创建实用程序类:
和方法:
如果您想要按顺序使用Stream,则可以对Stream进行分区(以及执行相关的功能,例如窗口化 - 我认为这是您在这种情况下真正需要的功能) . 两个支持标准流的分区的库是cyclops-react(我是作者)和jOOλ,其中cyclops-react扩展(添加诸如Windowing之类的功能) .
cyclops-streams有一组静态函数StreamUtils用于在Java Streams上运行,还有一系列函数,如splitAt,headAndTail,splitBy,分区用于分区 .
要将Stream窗口化为大小为30的嵌套流的流,您可以使用窗口方法 .
对于OP点,在Streaming术语中,将Stream拆分为给定大小的多个Streams是一个Windowing操作(而不是分区操作) .
有一个名为ReactiveSeq的Stream扩展类,它扩展了jool.Seq并添加了Windowing功能,可以使代码更清晰 .
正如Tagir指出的那样,这不适用于并行Streams . 如果您想要以多线程方式窗口或批处理您希望执行的Stream . cyclops-react中的LazyFutureStream可能很有用(Windowing在待办事项列表中,但现在可以使用普通的旧批处理) .
在这种情况下,数据将从执行Stream的多个线程传递到Multi-Producer / Single-Consumer无等待队列,并且来自该队列的顺序数据可以在再次分发到线程之前被窗口化 .
将任意源流分区为固定大小的批次是不可能的,因为这会搞砸并行处理 . 并行处理时,您可能不知道拆分后第一个子任务中有多少元素,因此在第一个子任务完全处理之前,您无法为下一个子任务创建分区 .
但是,可以从随机访问
List
创建分区流 . 例如,在我的StreamEx库中可以使用这样的功能:或者如果你真的想要流的流:
如果您不想依赖第三方库,可以手动实现此类
ofSubLists
方法:这个实现看起来有点长,但它考虑了一些极端情况,例如close-to-MAX_VALUE列表大小 .
如果你想要无序流的并行友好解决方案(所以你不关心哪个流元素将在单个批处理中组合),你可以像这样使用收集器(感谢@sibnick的灵感):
用法示例:
结果:
这种收集器是完全线程安全的,并为顺序流生成有序批次 .
如果要为每个批次应用中间转换,可以使用以下版本:
例如,通过这种方式,您可以动态地汇总每个批次中的数字:
看起来,正如Jon Skeet在他的_530007中所展示的那样,不可能让分区变得懒散 . 对于非惰性分区,我已经有了这个代码:
这是AbacusUtil的快速解决方案
声明:我是AbacusUtil的开发人员 .