首页 文章

分区Java 8流

提问于
浏览
46

如何在Java 8 Stream上实现"partition"操作?通过分区,我的意思是,将流划分为给定大小的子流 . 不知何故,它将与Guava Iterators.partition()方法相同,只是它's desirable that the partitions are lazily-evaluated Streams rather than List' s .

7 回答

  • 1

    这是一个纯Java解决方案,懒惰地评估而不是使用List .

    public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
        List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
        currentBatch.add(new ArrayList<T>(batchSize));
        return Stream.concat(stream
          .sequential()                   
          .map(new Function<T, List<T>>(){
              public List<T> apply(T t){
                  currentBatch.get(0).add(t);
                  return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
                }
          }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                    .limit(1)
        ).filter(Objects::nonNull);
    }
    

    该方法返回 Stream<List<T>> 以获得灵活性 . 您可以通过 partition(something, 10).map(List::stream) 轻松将其转换为 Stream<Stream<T>> .

  • 0

    我发现这个问题最优雅,最纯粹的java 8解决方案:

    public static <T> List<List<T>> partition(final List<T> list, int batchSize) {
    return IntStream.range(0, getNumberOfPartitions(list, batchSize))
                    .mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())))
                    .collect(toList());
    }
    
    //https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java
    private static <T> int getNumberOfPartitions(List<T> list, int batchSize) {
        return (list.size() + batchSize- 1) / batchSize;
    }
    
  • 0

    我认为内部存在某种黑客攻击是可能的:

    为批处理创建实用程序类:

    public static class ConcurrentBatch {
        private AtomicLong id = new AtomicLong();
        private int batchSize;
    
        public ConcurrentBatch(int batchSize) {
            this.batchSize = batchSize;
        }
    
        public long next() {
            return (id.getAndIncrement()) / batchSize;
        }
    
        public int getBatchSize() {
            return batchSize;
        }
    }
    

    和方法:

    public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){
        ConcurrentBatch batch = new ConcurrentBatch(batchSize);
        //hack java map: extends and override computeIfAbsent
        Supplier<ConcurrentMap<Long, List<T>>> mapFactory = () -> new ConcurrentHashMap<Long, List<T>>() {
            @Override
            public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) {
                List<T> rs = super.computeIfAbsent(key, mappingFunction);
                //apply batchFunc to old lists, when new batch list is created
                if(rs.isEmpty()){
                    for(Entry<Long, List<T>> e : entrySet()) {
                        List<T> batchList = e.getValue();
                        //todo: need to improve
                        synchronized (batchList) {
                            if (batchList.size() == batch.getBatchSize()){
                                batchFunc.accept(batchList);
                                remove(e.getKey());
                                batchList.clear();
                            }
                        }
                    }
                }
                return rs;
            }
        };
        stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s))
                .collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList())))
                .entrySet()
                .stream()
                //map contains only unprocessed lists (size<batchSize)
                .forEach(e -> batchFunc.accept(e.getValue()));
    }
    
  • 7

    如果您想要按顺序使用Stream,则可以对Stream进行分区(以及执行相关的功能,例如窗口化 - 我认为这是您在这种情况下真正需要的功能) . 两个支持标准流的分区的库是cyclops-react(我是作者)和jOOλ,其中cyclops-react扩展(添加诸如Windowing之类的功能) .

    cyclops-streams有一组静态函数StreamUtils用于在Java Streams上运行,还有一系列函数,如splitAt,headAndTail,splitBy,分区用于分区 .

    要将Stream窗口化为大小为30的嵌套流的流,您可以使用窗口方法 .

    对于OP点,在Streaming术语中,将Stream拆分为给定大小的多个Streams是一个Windowing操作(而不是分区操作) .

    Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);
    

    有一个名为ReactiveSeq的Stream扩展类,它扩展了jool.Seq并添加了Windowing功能,可以使代码更清晰 .

    ReactiveSeq<Integer> seq;
      ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);
    

    正如Tagir指出的那样,这不适用于并行Streams . 如果您想要以多线程方式窗口或批处理您希望执行的Stream . cyclops-react中的LazyFutureStream可能很有用(Windowing在待办事项列表中,但现在可以使用普通的旧批处理) .

    在这种情况下,数据将从执行Stream的多个线程传递到Multi-Producer / Single-Consumer无等待队列,并且来自该队列的顺序数据可以在再次分发到线程之前被窗口化 .

    Stream<List<Data>> batched = new LazyReact().range(0,1000)
                                                  .grouped(30)
                                                  .map(this::process);
    
  • 33

    将任意源流分区为固定大小的批次是不可能的,因为这会搞砸并行处理 . 并行处理时,您可能不知道拆分后第一个子任务中有多少元素,因此在第一个子任务完全处理之前,您无法为下一个子任务创建分区 .

    但是,可以从随机访问 List 创建分区流 . 例如,在我的StreamEx库中可以使用这样的功能:

    List<Type> input = Arrays.asList(...);
    
    Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
    

    或者如果你真的想要流的流:

    Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
    

    如果您不想依赖第三方库,可以手动实现此类 ofSubLists 方法:

    public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
        if (length <= 0)
            throw new IllegalArgumentException("length = " + length);
        int size = source.size();
        if (size <= 0)
            return Stream.empty();
        int fullChunks = (size - 1) / length;
        return IntStream.range(0, fullChunks + 1).mapToObj(
            n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
    }
    

    这个实现看起来有点长,但它考虑了一些极端情况,例如close-to-MAX_VALUE列表大小 .


    如果你想要无序流的并行友好解决方案(所以你不关心哪个流元素将在单个批处理中组合),你可以像这样使用收集器(感谢@sibnick的灵感):

    public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
                       Collector<List<T>, A, R> downstream) {
        class Acc {
            List<T> cur = new ArrayList<>();
            A acc = downstream.supplier().get();
        }
        BiConsumer<Acc, T> accumulator = (acc, t) -> {
            acc.cur.add(t);
            if(acc.cur.size() == batchSize) {
                downstream.accumulator().accept(acc.acc, acc.cur);
                acc.cur = new ArrayList<>();
            }
        };
        return Collector.of(Acc::new, accumulator,
                (acc1, acc2) -> {
                    acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
                    for(T t : acc2.cur) accumulator.accept(acc1, t);
                    return acc1;
                }, acc -> {
                    if(!acc.cur.isEmpty())
                        downstream.accumulator().accept(acc.acc, acc.cur);
                    return downstream.finisher().apply(acc.acc);
                }, Collector.Characteristics.UNORDERED);
    }
    

    用法示例:

    List<List<Integer>> list = IntStream.range(0,20)
                                        .boxed().parallel()
                                        .collect(unorderedBatches(3, Collectors.toList()));
    

    结果:

    [[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
    

    这种收集器是完全线程安全的,并为顺序流生成有序批次 .

    如果要为每个批次应用中间转换,可以使用以下版本:

    public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
            Collector<T, AA, B> batchCollector,
            Collector<B, A, R> downstream) {
        return unorderedBatches(batchSize, 
                Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
    }
    

    例如,通过这种方式,您可以动态地汇总每个批次中的数字:

    List<Integer> list = IntStream.range(0,20)
            .boxed().parallel()
            .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
                Collectors.toList()));
    
  • 6

    看起来,正如Jon Skeet在他的_530007中所展示的那样,不可能让分区变得懒散 . 对于非惰性分区,我已经有了这个代码:

    public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
        final Iterator<T> it = source.iterator();
        final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
        final Iterable<Stream<T>> iterable = () -> partIt;
    
        return StreamSupport.stream(iterable.spliterator(), false);
    }
    
  • 2

    这是AbacusUtil的快速解决方案

    IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray()));
    

    声明:我是AbacusUtil的开发人员 .

相关问题