首页 文章

Java固定大小的线程池和所有CPU核心的最佳使用

提问于
浏览
4

如何使用8个线程一直用于“昂贵”的部件?

我有一个数字运算问题,我创建了一个简单的框架 . 我的问题是找到一种优雅而简单的方法来优化使用所有CPU内核 .

为了获得良好的性能,我使用固定大小为8的线程池 . 我们的想法是使用与硬件线程一样多的线程以获得最佳性能 .

框架的简化伪代码使用如下:

interface Task {
  data[] compute(data[]);
}

Task task = new Loop(new Chain(new DoX(), new DoY(), new Split(2, new DoZ())));
result = task.compute(data);
  • 循环任务将循环,直到满足某些终止条件

  • Chain Task将链接任务(例如在上面的r = t1.compute(r); r = t2.compute(r); r = t3.compute(r); return r;)

  • 拆分任务将拆分数据并在部件上执行任务(例如,创建2个部件并返回新数据[] {t1.compute(part1),t1.compute(part2)})

线程目前在Split任务中实现 . 因此,Split Task会将t1.compute(part1)和t1.compute(part2)的计算交给线程池 .

方法1,可能完全死锁

我的第一种方法是,Split Task有一系列期货,并且一个接一个地调用get() . 但这意味着如果Split任务位于另一个Split Task中,则future.get()中的阻塞等待将阻止外部Split Task从线程池中获取的线程 . 所以我有不到8个线程真正起作用 . 如果这种等级很深,我可能没有人在工作并且永远等待 .

1) 我假设future.get()不会将线程返回到线程池,对吗?所以如果这样做我会在将来等待 . 但是没有更多的线程可以开始工作吗? [我不能轻易测试,因为我已经改变了方法]

方法2,目前的方法,至少有人在工作

我当前的方法(并不是更好)是使用当前线程完成拆分的最后一部分(partN) . 如果完成,我检查partN-1是否已经启动,如果是,我等待future.get()中的所有任务,否则当前线程也会执行partN-1,如果需要partN-2 ...所以现在我应该总是在池中至少有一个线程工作 .

但是,由于问题1)的答案可能是future.get()将阻塞我的线程,通过这种方法,我将在深层次结构上只有很少的工作线程 .

方法3,我看到的唯一解决方案

我假设我必须使用2个线程池,一个用于努力工作,一个用于所有等待 . 所以我会有一个固定大小的线程池用于努力工作和(动态?)一个等待 .

3.a . :但这意味着Split Task必须只从等待池中生成线程,而执行实际工作的Task将从工作池中生成一个新线程并等待它完成 . 丑陋,但应该工作 . 丑陋,因为目前整个线程支持都在Split任务中,但是通过这个解决方案,其他完成艰苦工作的任务必须知道线程 .

3.b . :另一种方法是Split生成工作线程,但是在内部拆分时,每个等待必须由等待线程完成,而当前线程同时执行工作线程任务 . 有了这个,所有线程支持都在Split Task类中,但我不确定如何实现它 .

2a) 如何在不阻塞当前线程的情况下等待任务?

2b) 我可以将当前线程返回到工作线程池,让服务器线程等待,然后等待继续前一个当前线程或来自工作线程池的线程吗?怎么样?

其他解决方案

不要使用固定大小的线程池 .

3) 我的想法是8个线程错了吗?但是,如果层次结构可以深入多少呢?并不存在JVM并行启动许多任务并在它们之间切换很多任务的风险吗?

4) 我想念什么或者你会怎么做才能解决这个问题?

非常感谢和问候


[编辑]

接受的解决方案以及为什么我尝试不同的东西(基于方法2)

我接受了ForkJoinPool作为正确的解决方案 .

然而,一些细节和可能的开销和失去控制使我想尝试另一种方法 . 但是我想的越多,我就越回到使用ForkJoinPool(最后请参阅说明) . 抱歉,文字数量 .

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

“但是,面对被阻止的IO或其他非托管同步,不能保证这样的调整 . ”

“最大运行线程数为32767”

http://homes.cs.washington.edu/~djg/teachingMaterials/grossmanSPAC_forkJoinFramework.html

“ForkJoin框架的文档表明了这一点创建并行子任务,直到基本计算步数超过100且小于10,000 . “

“艰苦工作”任务从磁盘读取大量数据,并且距离10,000次基本计算非常远 . 实际上我可以将它分叉/加入到可接受的级别,但现在这工作太多了,因为代码的这一部分相当复杂 .

我认为方法3a基本上是ForkJoin的一个实现,除了我会有更多的控制和可能更少的开销,上面提到的问题不应该存在(但没有自动适应OS提供的CPU资源,但我会强制操作系统如果必须的话,给我我想要的东西 .

我可能尝试使用方法2进行一些更改:这样我可以使用精确的线程编号而且我没有任何等待线程,如果我理解正确,ForkJoinPool似乎可以使用等待线程 .

当前线程执行作业,直到此Split实例中的所有作业都由工作线程运行(因此像以前一样在Split节点中工作),但是它不会调用future.get(),而只是检查所有期货是否已准备就绪的Future.isDone() . 如果不是全部完成,它将从线程池中窃取一个作业并执行它,然后再次检查期货 . 这样,只要有一个工作没有运行,我就永远不会等待 .

丑陋:如果没有工作要偷,我将不得不睡一小段时间,然后再次检查期货或从池中偷取一份新工作(有没有办法等待多个期货全部完成超时如果它触发,它将不会取消计算?)

所以我认为我必须在每个Split Task中使用ThreadPool的Completion Service,然后我可以使用超时轮询并且不需要睡眠 .

假设:完成服务中的ThreadPool仍然可以像普通的ThreadPool一样使用(例如,作业窃取) . 一个ThreadPool可以在许多完成服务中 .

我认为这是问题中详述的问题的最佳解决方案 . 但是,这有一个小问题,请参阅以下内容 .

注意:

在再次查看“硬”任务之后,我发现它们可以在许多实例化中并行化 . 因此,在那里添加线程也是下一个合乎逻辑的步骤 . 这些都是叶子节点,他们所做的工作最好用完成服务完成(在某些情况下,子作业可以有不同的运行时,但任何2个结果都可以构建一个新的工作) . 要使用ForkJoinPool执行它们,我必须使用managedBlock()并实现ForkJoinPool.ManagedBlocker,这会使代码更复杂 . 但是,同时在这些离开节点中使用CompletionService意味着我的基于方法2的解决方案也可能需要等待线程,所以我最好选择ForkJoinPool .

3 回答

  • 1

    您似乎有一个并行的“分而治之”类型的问题,您可以通过递归方式将问题分解为使用可用内核“解决”的子问题 .

    你是正确的,创建线程的niave实现可能会使用大量资源,并且使用有界线程池很可能会死锁 .

    第三种选择是在Java 7中实现的"fork/join"模型 . 这在Oracle Java教程(here)中有所描述,但我认为Dan Grossman的讲义更能解释它:

  • 1

    要完全避免死锁,请不要使用同步Future.get() . 使用异步方法CompletableFuture.then和CompletableFuture.both,在Java8中可用 . 这些方法不会阻止,但在数据可用时提交新任务 . 如果您不想使用Java8,请查看Guava库,我相信它具有相同的功能 . 存在其他异步库,例如我的https://github.com/rfqu/df4j它的优点是可以重用任务对象,因此必须创建较少数量的对象 . 如果您提供有关问题的更详细描述(例如,以普通的顺序形式,或使用无限数量的线程),我可以帮助您使用df4j实现您的程序 .

  • 0

    我不得不离开 ForkJoinPool ,它没有最佳地使用线程 . 虽然它对Loop和Split节点工作正常,但如果我想并行化实际工作发生的叶节点,它就不再起作用了 . 当我将它们添加为 RecursiveTask 时,则大多数线程都处于空闲状态 . join() 调用由于某种原因(jdk1.7.0_45)没有窃取叶子中的工作 . 它在等待 . 在我的情况下,所有的工作都在叶子中,所以使用a叶子的自定义 RecursiveTask 子类比仅用于循环和分割节点更糟糕(因为它在部分工作之后等待,否则它在所有工作之后等待) . 我不认为我使用 ForkJoinPool 错了,如果你谷歌你发现有类似问题的人 .

    我现在做了一个简单的解决方案:2个线程池,1个用于实际硬件工作的固定大小,以及用于所有Loop和Split节点的高速缓存池 . 我创建了 FakeRecursiveTask (扩展它而不是原始),所以我不必更改代码(对于循环和拆分) . 我使用 HardWork 作为叶子的基类只是很明显它是不同的东西,只需调用 doHardWork(work) .

    使用此解决方案,我的所有工作线程都会一直使用 . 由于树的大小有限,我应该永远不会用完辅助线程 . 实际上在我的情况下,它主要使用与工作线程相同数量的辅助线程(在我的情况下为8) .

    public class ThreadPool3 {
            private static int maxNumWorkerThreads;
            private static ExecutorService workerPool = null;
            private static ExecutorService helperPool = null;
    
            public static void initThreadPool(int maxNumWorkerThreads_) {
                    int availProcessors = Runtime.getRuntime().availableProcessors();
                    if (maxNumWorkerThreads_ <= 0) {
                            maxNumWorkerThreads_ = availProcessors;
                    }
                    maxNumWorkerThreads = maxNumWorkerThreads_;
    
                    if (availProcessors != maxNumWorkerThreads) {
                            System.out.println("WARN: maxNumWorkerThreads (" + maxNumWorkerThreads + ") != availProcessors (" + availProcessors + ")");
                    }
                    workerPool = Executors.newFixedThreadPool(maxNumWorkerThreads);
                    BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>();
                    helperPool = new ThreadPoolExecutor(0, 4 * maxNumWorkerThreads, 60, TimeUnit.MINUTES, workQueue, Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());
            }
    
    
            public static abstract class HardWork implements Callable<Void> {
                    @Override
                    public abstract Void call() throws Exception;
            }
    
            public static void doHardWork(List<HardWork> tasks) throws Exception {
                    workerPool.invokeAll(tasks);
            }
    
    
            /**
            * fake ForkJoinPoolInterface:
            *
            */
            public static abstract class FakeRecursiveTask<T> implements Callable<T> {
                    private Future<T> resultFuture = null;
    
                    /**
                    * fake interface:
                    */
                    public abstract T compute();
    
                    /**
                    * fake interface:
                    */
                    public T invoke() {
                            return compute();
                    }
    
                    /**
                    * fake interface:
                    */
                    public void fork() {
                            resultFuture = helperPool.submit(this);
                    }
    
                    /**
                    * fake interface:
                    */
                    public T join() {
                            try {
                                    return resultFuture.get();
                            }
                            catch (Exception e) {
                                    throw new RuntimeException(e);
                            }
                    }
    
                    @Override
                    public T call() throws Exception {
                            return compute();
                    }
            }
    
    
            public static void shutdownThreadPool() {
                    if (workerPool != null) {
                            workerPool.shutdown();
                    }
                    if (helperPool != null) {
                            helperPool.shutdown();
                    }
            }
    }
    

相关问题