首页 文章

fork / join框架如何比线程池更好?

提问于
浏览
113

使用新的fork/join framework而不仅仅是在开始时将大任务分成N个子任务,将它们发送到缓存的线程池(来自Executors)并等待每个任务完成,有什么好处?我没有看到使用fork / join抽象如何简化问题或使解决方案从我们多年来的工作中提高效率 .

例如,tutorial example中的并行模糊算法可以像这样实现:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}

在开头拆分并将任务发送到线程池:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!

任务转到线程池的队列,当工作线程可用时,它们将从该队列执行 . 只要拆分足够精细(以避免必须特别等待最后一个任务)并且线程池有足够的(至少N个处理器)线程,所有处理器都在全速工作,直到整个计算完成 .

我错过了什么吗?使用fork / join框架的附加 Value 是什么?

10 回答

  • 2

    另一个重要的区别似乎是,使用F-J,你可以做多个复杂的阶段 . 考虑合并排序http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html,预分割这项工作需要太多的编排 . 例如你需要做以下事情:

    • 排序第一季度

    • 排序第二季度

    • 合并前两个季度

    • 排序第三季度

    • 排序第四季度

    • 合并了最近2个季度

    • 合并了两半

    如何指定在合并之前必须进行排序等 .

    我一直在研究如何最好地为每个项目列表做某件事 . 我想我会预先拆分列表并使用标准的ThreadPool . 当工作不能被预分割成足够独立的任务时,F-J似乎最有用,但是可以递归地分割成它们之间独立的任务(例如,将两半分开是独立的,但是将两个分类的半部合并为有序整体不是) .

  • 6

    我认为基本的误解是,Fork / Join示例显示工作 stealing 但只是某种标准的分而治之 .

    工作窃取将是这样的: Worker B已完成他的工作 . 他是一个善良的人,所以他环顾四周,看到 Worker A仍然非常努力 . 他漫步并问道:“嘿小伙子,我可以帮你一把 . ”回复 . “很酷,我有1000个单位的任务 . 到目前为止,我已经完成了345个离开655.你能不能在673到1000号工作,我会做346到672个 . ” B说“好的,我们先来吧,我们可以早点去酒吧 . ”

    你看 - 即使他们开始真正的工作, Worker 也必须彼此沟通 . 这是示例中缺少的部分 .

    另一方面,示例仅显示“使用分包商”之类的内容:

    Worker A:“Dang,我有1000个单位的工作 . 对我来说太多了 . 我自己会做500个并将500个转包给其他人 . ”这种情况一直持续到大任务被分解为每个10个单元的小包 . 这些将由可用的工作人员执行 . 但是,如果一个包是一种毒丸并且需要比其他包更长的时间 - 运气不好,分裂阶段就结束了 .

    Fork / Join和预先拆分任务之间唯一的区别是:在前期拆分时,您可以从一开始就完整地处理工作队列 . 示例:1000个单位,阈值为10,因此队列有100个条目 . 这些数据包被分发给线程池成员 .

    Fork / Join更复杂,并尝试将队列中的数据包数量保持较小:

    • 步骤1:将包含(1 ... 1000)的一个数据包放入队列

    • 步骤2:一个工作人员弹出数据包(1 ... 1000)并用两个数据包替换它:(1 ... 500)和(501 ... 1000) .

    • 步骤3:一名 Worker 弹出数据包(500 ... 1000)并推送(500 ... 750)和(751 ... 1000) .

    • 步骤n:堆栈包含以下数据包:(1..500),(500 ... 750),(750 ... 875)...(991..1000)

    • 步骤n 1:弹出并执行包(991..1000)

    • 步骤n 2:弹出并执行包(981..990)

    • 步骤n 3:弹出包(961..980)并分成(961 ... 970)和(971..980) . ....

    您会看到:在Fork / Join中,队列较小(示例中为6),并且“split”和“work”阶段是交错的 .

    当多个 Worker 同时弹出和推动时,互动当然不是那么清楚 .

  • 11

    如果你有n个繁忙线程都是100%独立工作,那么这将比Fork-Join(FJ)池中的n个线程更好 . 但它从来没有这样做过 .

    可能无法将问题精确地分成n个相等的部分 . 即使你这样做,线程调度也是不公平的 . 你最终会等待最慢的线程 . 如果你有多个任务,那么它们每个都可以以低于n路的并行性运行(通常效率更高),但是当其他任务完成时,它会向上运行 .

    那么为什么我们不把问题简化为FJ大小的部分并且有一个线程池工作 . 典型的FJ使用将问题分解成小块 . 以随机顺序执行这些操作需要在硬件级别进行大量协调 . 开销将是一个杀手 . 在FJ中,任务被放入队列中,线程以后进先出顺序(LIFO /堆栈)读取,并且工作窃取(通常在核心工作中)先进先出(FIFO /“队列”) . 结果是长阵列处理可以在很大程度上顺序完成,即使它已被破坏分成小块 . (同样的情况是,在一次大爆炸中将问题分解成小的均匀大小的块可能并不是一件轻而易举的事 . 假设在没有 balancer 的情况下处理某种形式的层次结构 . )

    结论:FJ允许在不 balancer 的情况下更有效地使用硬件线程,如果您有多个线程,则总是如此 .

  • 13

    Fork / join与线程池不同,因为它实现了工作窃取 . 来自Fork/Join

    与任何ExecutorService一样,fork / join框架将任务分配给线程池中的工作线程 . fork / join框架是不同的,因为它使用了工作窃取算法 . 用尽要做的事情的工作线程可以从仍然忙碌的其他线程中窃取任务 .

    假设你有两个线程,4个任务a,b,c,d分别需要1,1,5和6秒 . 最初,a和b分配给线程1,c和d分配给线程2.在线程池中,这将花费11秒 . 使用fork / join,线程1完成并可以从线程2中窃取工作,因此任务d最终将由线程1执行 . 线程1执行a,b和d,线程2只是c . 总时间:8秒,而不是11秒 .

    编辑:正如Joonas指出的那样,任务不一定预先分配给一个线程 . fork / join的想法是线程可以选择将任务拆分为多个子块 . 所以要重申以上内容:

    我们有两个任务(ab)和(cd),分别需要2秒和11秒 . 线程1开始执行ab并将其分成两个子任务a和b . 与线程2类似,它分为两个子任务c&d . 当线程1完成a&b时,它可以从线程2中窃取d .

  • 1

    上面的每个人都是正确的,通过工作窃取可以获得好处,但要扩展其原因 .

    主要好处是工作线程之间的有效协调 . 工作必须分开并重新组装,这需要协调 . 正如您在A.H的答案中所见,每个线程都有自己的工作清单 . 此列表的一个重要属性是它已排序(顶部的大型任务和底部的小型任务) . 每个线程执行其列表底部的任务,并从其他线程列表的顶部窃取任务 .

    结果是:

    • 任务列表的头部和尾部可以独立同步,减少列表上的争用 .

    • 工作的重要子树被同一个线程拆分并重新组装,因此这些子树不需要线程间协调 .

    • 当一个线程窃取工作时,它需要一个大块然后再细分到它自己的列表中

    • 钢结构工作意味着螺纹几乎完全被利用,直到工艺结束 .

    使用线程池的大多数其他分而治之的方案需要更多的线程间通信和协调 .

  • 11

    在此示例中,Fork / Join不添加任何值,因为不需要分叉,并且工作负载在工作线程之间均匀分配 . Fork / Join只会增加开销 .

    这是关于这个主题的nice article . 引用:

    总的来说,我们可以说ThreadPoolExecutor是工作线程均匀分配工作负载的首选 . 为了能够保证这一点,您需要准确了解输入数据的外观 . 相比之下,ForkJoinPool无论输入数据如何都能提供良好的性能,因此是一种更加强大的解决方案 .

  • 25

    线程池和Fork / Join的最终目标是相似的:两者都希望尽可能利用可用的CPU功率来实现最大吞吐量 . 最大吞吐量意味着应该在很长一段时间内完成尽可能多的任务 . 需要做什么? (对于以下内容,我们假设计算任务并不缺乏:100%的CPU利用率总是足够 . 另外,在超线程的情况下,我使用“CPU”等效于核心或虚拟核心) .

    • 至少需要尽可能多的线程运行,因为有可用的CPU,因为运行较少的线程将使核心未使用 .

    • 最多必须有尽可能多的线程运行,因为有可用的CPU,因为运行更多线程将为调度程序创建额外的负载,调度程序将CPU分配给不同的线程,这会导致一些CPU时间进入调度程序而不是我们的计算任务 .

    因此,我们发现,为了获得最大吞吐量,我们需要拥有与CPU完全相同的线程数 . 在Oracle的模糊示例中,您可以使用数字来获取固定大小的线程池线程数等于可用CPU数或使用线程池 . 它没有区别,你是对的!

    So when will you get into trouble with a thread pools? That is if a thread blocks ,因为您的线程正在等待另一个任务完成 . 假设以下示例:

    class AbcAlgorithm implements Runnable {
        public void run() {
            Future<StepAResult> aFuture = threadPool.submit(new ATask());
            StepBResult bResult = stepB();
            StepAResult aResult = aFuture.get();
            stepC(aResult, bResult);
        }
    }
    

    我们在这里看到的是一个由三个步骤A,B和C组成的算法.A和B可以彼此独立地执行,但是步骤C需要步骤A和B的结果 . 这个算法的作用是将任务A提交给线程池并直接执行任务b . 之后,线程也将等待任务A完成并继续执行步骤C.如果A和B同时完成,那么一切都很好 . 但是,如果A需要比B更长的时间呢?这可能是因为任务A的性质决定了它,但也可能是这种情况,因为任务A的开头没有可用的线程,任务A需要等待 . (如果只有一个CPU可用,因此你的线程池只有一个线程,这甚至会导致死锁,但现在除了这一点之外) . 关键是刚刚执行任务B blocks the whole thread 的线程 . 由于我们拥有与CPU相同数量的线程,并且一个线程被阻止,这意味着 one CPU is idle .

    Fork / Join解决了这个问题:在fork / join框架中,您将编写相同的算法,如下所示:

    class AbcAlgorithm implements Runnable {
        public void run() {
            ATask aTask = new ATask());
            aTask.fork();
            StepBResult bResult = stepB();
            StepAResult aResult = aTask.join();
            stepC(aResult, bResult);
        }
    }
    

    看起来一样,不是吗?然而,线索是 aTask.join will not block . 相反,这里是 work-stealing 发挥作用:线程将环顾四周过去已经分叉的其他任务,并将继续这些任务 . 首先,它检查它已经分叉的任务是否已经开始处理 . 因此,如果A尚未被另一个线程启动,它将执行A next,否则它将检查其他线程的队列并窃取他们的工作 . 一旦另一个线程的另一个任务完成,它将检查A是否现在完成 . 如果是上面的算法可以调用 stepC . 否则它将寻找另一个偷窃的任务 . 因此 fork/join pools can achieve 100% CPU utilisation, even in the face of blocking actions .

    但是有一个陷阱:工作窃取只能用于 ForkJoinTaskjoin 调用 . 无法对外部阻塞操作(如等待另一个线程或等待I / O操作)执行此操作 . 那么等待I / O完成是一项常见的任务呢?在这种情况下,如果我们可以向Fork / Join池添加一个额外的线程,一旦阻塞操作完成就会再次停止,这将是第二个最好的事情 . 如果我们使用 ManagedBlockerForkJoinPool 实际上可以做到这一点 .

    斐波那契

    JavaDoc for RecursiveTask中是使用Fork / Join计算Fibonacci数的示例 . 对于经典的递归解决方案,请参阅:

    public static int fib(int n) {
        if (n <= 1) {
            return n;
        }
        return fib(n - 1) + fib(n - 2);
    }
    

    正如在JavaDocs中所解释的,这是计算斐波纳契数的一种非常好的转储方法,因为该算法具有O(2 ^ n)复杂度,而更简单的方法是可能的 . 但是这个算法非常简单易懂,所以我们坚持使用它 . 让我们假设我们想要使用Fork / Join加快速度 . 一个天真的实现看起来像这样:

    class Fibonacci extends RecursiveTask<Long> {
        private final long n;
    
        Fibonacci(long n) {
            this.n = n;
        }
    
        public Long compute() {
            if (n <= 1) {
                return n;
            }
            Fibonacci f1 = new Fibonacci(n - 1);
            f1.fork();
            Fibonacci f2 = new Fibonacci(n - 2);
            return f2.compute() + f1.join();
       }
    }
    

    这个任务被拆分的步骤太短,因此会执行得非常糟糕,但你可以看到框架通常如何工作得很好:两个加法可以独立计算,但是我们需要它们两个来构建最终的结果 . 所以一半是在另一个线程中完成的 . 在线程池中做同样的事情而不会出现死锁(可能,但不是那么简单) .

    只是为了完整性:如果您真的想使用这种递归方法计算斐波纳契数,这里是一个优化版本:

    class FibonacciBigSubtasks extends RecursiveTask<Long> {
        private final long n;
    
        FibonacciBigSubtasks(long n) {
            this.n = n;
        }
    
        public Long compute() {
            return fib(n);
        }
    
        private long fib(long n) {
            if (n <= 1) {
                return 1;
            }
            if (n > 10 && getSurplusQueuedTaskCount() < 2) {
                final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
                final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
                f1.fork();
                return f2.compute() + f1.join();
            } else {
                return fib(n - 1) + fib(n - 2);
            }
        }
    }
    

    这使得子任务保持更小,因为它们仅在 n > 10 && getSurplusQueuedTaskCount() < 2 为真时被拆分,这意味着要进行的操作( n > 10 )显着超过100个,并且没有非常的人工任务等待( getSurplusQueuedTaskCount() < 2 ) .

    在我的计算机上(4核(计数超线程时为8核),英特尔(R)核(TM)i7-2720QM CPU @ 2.20GHz) fib(50) 采用经典方法需要64秒,使用Fork / Join方法只需18秒这是一个非常明显的收获,虽然没有理论上可能的那么多 .

    摘要

    • 是的,在您的示例中,Fork / Join没有经典线程池的优势 .

    • Fork / Join可以在涉及阻塞时显着提高性能

    • Fork / Join绕过了一些死锁问题

  • 8

    当您进行昂贵的合并操作时,F / J也具有明显的优势 . 因为它分裂为树结构,所以只进行log2(n)合并,而不是n合并线性线程分裂 . (这确实使理论假设你拥有与线程一样多的处理器,但仍然是一个优势)对于家庭作业,我们必须通过对每个索引处的值求和来合并数千个2D阵列(所有相同的维度) . 使用fork join和P处理器,当P接近无穷大时,时间接近log2(n) .

    1 2 3 .. 7 3 1 .... 8 5 4
    4 5 6 2 4 3 => 6 9 9
    7 8 9 .. 1 1 0 .... 8 9 9

  • 12

    如果问题是我们必须等待其他线程完成(如排序数组或数组之和),应该使用fork join,因为Executor(Executors.newFixedThreadPool(2))会因限制而窒息线程数 . 在这种情况下,forkjoin池将创建更多线程,以掩盖阻塞线程以保持相同的并行性

    Source: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

    执行分而治之算法的执行程序的问题与创建子任务无关,因为Callable可以自由地向其执行程序提交新的子任务并以同步或异步方式等待其结果 . 问题在于并行性:当Callable等待另一个Callable的结果时,它处于等待状态,因此浪费了处理排队等待执行的另一个Callable的机会 .

    通过Doug Lea的努力添加到Java SE 7中的java.util.concurrent包中的fork / join框架填补了这一空白

    Source: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

    池尝试通过动态添加,挂起或恢复内部工作线程来维护足够的活动(或可用)线程,即使某些任务停止等待加入其他任务也是如此 . 但是,面对阻塞的IO或其他非托管同步,不能保证这样的调整

    public int getPoolSize()返回已启动但尚未终止的工作线程数 . The result returned by this method may differ from getParallelism() when threads are created to maintain parallelism when others are cooperatively blocked.

  • 122

    在像爬虫这样的应用程序中,您会对ForkJoin的性能感到惊讶 . 这是你要学习的最好的tutorial .

    Fork / Join的逻辑很简单:(1)将每个大任务分开(分叉)成较小的任务; (2)在一个单独的线程中处理每个任务(如果需要,将它们分成更小的任务); (3)加入结果 .

相关问题