首页 文章

Java 8并行流中的自定义线程池

提问于
浏览
317

是否可以为Java 8 parallel stream指定自定义线程池?我找不到任何地方 .

想象一下,我有一个服务器应用程序,我想使用并行流 . 但是应用程序很大且是多线程的,因此我想将它划分为区分 . 我不想在另一个模块的应用程序块任务的一个模块中执行缓慢的任务 .

如果我不能为不同的模块使用不同的线程池,则意味着在大多数现实情况下我无法安全地使用并行流 .

请尝试以下示例 . 在单独的线程中执行一些CPU密集型任务 . 这些任务利用并行流 . 第一个任务被破坏,因此每个步骤需要1秒(通过线程休眠模拟) . 问题是其他线程卡住并等待破坏的任务完成 . 这是一个人为的例子,但想象一下servlet应用程序和有人向共享fork连接池提交长时间运行的任务 .

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

11 回答

  • 16

    除了在您自己的forkJoinPool中触发并行计算的技巧之外,您还可以将该池传递给CompletableFuture.supplyAsync方法,如:

    ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
        //parallel task here, for example
        range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
        forkJoinPool
    );
    
  • 7

    到目前为止,我使用了这个问题的答案中描述的解决方案 . 现在,我想出了一个名为Parallel Stream Support的小库:

    ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
    ParallelIntStreamSupport.range(1, 1_000_000, pool)
        .filter(PrimesPrint::isPrime)
        .collect(toList())
    

    但正如@PabloMatiasGomez在评论中指出的那样,并行流的分裂机制存在缺陷,这在很大程度上取决于公共池的大小 . 见Parallel stream from a HashSet doesn't run in parallel .

    我使用此解决方案只为不同类型的工作提供单独的池,但即使我不使用它,我也无法将公共池的大小设置为1 .

  • 6

    我按照以下方式尝试了 custom ForkJoinPool来调整池大小:

    private static Set<String> ThreadNameSet = new HashSet<>();
    private static Callable<Long> getSum() {
        List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
        return () -> aList.parallelStream()
                .peek((i) -> {
                    String threadName = Thread.currentThread().getName();
                    ThreadNameSet.add(threadName);
                })
                .reduce(0L, Long::sum);
    }
    
    private static void testForkJoinPool() {
        final int parallelism = 10;
    
        ForkJoinPool forkJoinPool = null;
        Long result = 0L;
        try {
            forkJoinPool = new ForkJoinPool(parallelism);
            result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call
    
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown(); //always remember to shutdown the pool
            }
        }
        out.println(result);
        out.println(ThreadNameSet);
    }
    

    这是输出说池正在使用比默认值 4 更多的线程 .

    50000005000000
    [ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]
    

    但实际上有一个 weirdo ,当我尝试使用 ThreadPoolExecutor 实现相同的结果时如下:

    BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
    ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));
    

    但我失败了 .

    它只会在一个新线程中启动 parallelStream ,然后其他所有内容都是相同的, again 证明 parallelStream 将使用 the ForkJoinPool 来启动其子线程 .

  • 32

    如果您不介意使用第三方库,使用cyclops-react可以在同一管道中混合顺序和并行Streams并提供自定义ForkJoinPools . 例如

    ReactiveSeq.range(1, 1_000_000)
                .foldParallel(new ForkJoinPool(10),
                              s->s.filter(i->true)
                                  .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                                  .max(Comparator.naturalOrder()));
    

    或者,如果我们希望在顺序流中继续处理

    ReactiveSeq.range(1, 1_000_000)
                .parallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
                .map(this::processSequentially)
                .forEach(System.out::println);
    

    [披露我是独眼巨人的主要开发者 - 反应]

  • 0

    要测量实际使用的线程数,可以检查 Thread.activeCount()

    Runnable r = () -> IntStream
                .range(-42, +42)
                .parallel()
                .map(i -> Thread.activeCount())
                .max()
                .ifPresent(System.out::println);
    
        ForkJoinPool.commonPool().submit(r).join();
        new ForkJoinPool(42).submit(r).join();
    

    这可以在4核CPU上产生如下输出:

    5 // common pool
    23 // custom pool
    

    没有 .parallel() 它给出:

    3 // common pool
    4 // custom pool
    
  • 1

    并行流使用默认的 ForkJoinPool.commonPoolby default has one less threads as you have processors,由 Runtime.getRuntime().availableProcessors() 返回(这意味着并行流使用所有处理器,因为它们也使用主线程):

    对于需要单独或自定义池的应用程序,可以使用给定的目标并行度级别构造ForkJoinPool;默认情况下,等于可用处理器的数量 .

    这也意味着如果您同时启动嵌套并行流或多个并行流,它们将共享同一个池 . 优点:您永远不会使用超过默认值(可用处理器数量) . 缺点:您可能无法为您启动的每个并行流分配"all the processors"(如果您碰巧有多个并行流) . (显然你可以使用_459208来规避它 . )

    要更改并行流的执行方式,您也可以

    • 将并行流执行提交给您自己的ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();

    • 您可以使用系统属性更改公共池的大小: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") 表示20个线程的目标并行度 .


    我的机器上有后者的例子有8个处理器 . 如果我运行以下程序:

    long start = System.currentTimeMillis();
    IntStream s = IntStream.range(0, 20);
    //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
    s.parallel().forEach(i -> {
        try { Thread.sleep(100); } catch (Exception ignore) {}
        System.out.print((System.currentTimeMillis() - start) + " ");
    });
    

    输出是:

    215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

    因此,您可以看到并行流一次处理8个项目,即它使用8个线程 . 但是,如果我取消注释注释行,则输出为:

    215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

    这次,并行流使用了20个线程,并且流中的所有20个元素已同时处理 .

  • 0

    去获取AbacusUtil . 可以为并行流指定线程号 . 以下是示例代码:

    LongStream.range(4, 1_000_000).parallel(threadNum)...
    

    披露:我是AbacusUtil的开发人员 .

  • 319

    使用ForkJoinPool并提交并行流不能可靠地使用所有线程 . 如果你看这个(Parallel stream from a HashSet doesn't run in parallel)和这个(Why does the parallel stream not use all the threads of the ForkJoinPool?),你会看到推理 .

    简短版本:如果ForkJoinPool / submit不适合您,请使用

    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");
    
  • 2

    实际上有一个技巧如何在特定的fork-join池中执行并行操作 . 如果您将它作为fork-join池中的任务执行,它会保留在那里并且确实存在不要使用普通的 .

    ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    forkJoinPool.submit(() ->
        //parallel task here, for example
        IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
    ).get();
    

    诀窍是基于ForkJoinTask.fork,它指定:"Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

  • 163

    如果您不需要自定义ThreadPool但是您想要限制并发任务的数量,则可以使用:

    List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
    List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method
    
    partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
           // do your processing   
    }));
    

    (要求这个问题的重复问题已被锁定,所以请在这里告诉我)

  • 0

    Note: 似乎在JDK 10中实现了一个修复,确保自定义线程池使用预期的线程数 .

    自定义ForkJoinPool中的并行流执行应遵循并行性https://bugs.openjdk.java.net/browse/JDK-8190974

相关问题