首页 文章

ExecutorService,如何等待所有任务完成

提问于
浏览
160

等待 ExecutorService 的所有任务完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的工作 - 每个核心一个 . 现在我的设置如下:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask 实现了runnable . 这似乎正确执行任务,但代码在 wait() 上与 IllegalMonitorStateException 崩溃 . 这很奇怪,因为我玩了一些玩具示例,它似乎工作 .

uniquePhrases 包含数万个元素 . 我应该使用其他方法吗?我正在寻找尽可能简单的东西

14 回答

  • 0

    您可以使用 ExecutorService.invokeAll 方法,它将执行所有任务并等待所有线程完成其任务 .

    这是完整的javadoc

    您还可以使用此方法的重载版本来指定超时 .

    以下是带有 ExecutorService.invokeAll 的示例代码

    public class Test {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            ExecutorService service = Executors.newFixedThreadPool(3);
            List<Callable<String>> taskList = new ArrayList<>();
            taskList.add(new Task1());
            taskList.add(new Task2());
            List<Future<String>> results = service.invokeAll(taskList);
            for (Future<String> f : results) {
                System.out.println(f.get());
            }
        }
    
    }
    
    class Task1 implements Callable<String> {
        @Override
        public String call() throws Exception {
            try {
                Thread.sleep(2000);
                return "Task 1 done";
            } catch (Exception e) {
                e.printStackTrace();
                return " error in task1";
            }
        }
    }
    
    class Task2 implements Callable<String> {
        @Override
        public String call() throws Exception {
            try {
                Thread.sleep(3000);
                return "Task 2 done";
            } catch (Exception e) {
                e.printStackTrace();
                return " error in task2";
            }
        }
    }
    
  • 188

    我还有一种情况,我有一组要爬网的文件 . 我从一个应该处理的初始“种子”文档开始,该文档包含指向其他文档的链接,这些文档也应该被处理,等等 .

    在我的主程序中,我只想编写类似下面的内容,其中 Crawler 控制着一堆线程 .

    Crawler c = new Crawler();
    c.schedule(seedDocument); 
    c.waitUntilCompletion()
    

    如果我想导航树,也会发生同样的情况;我会弹出根节点,每个节点的处理器会根据需要将子节点添加到队列中,并且一堆线程将处理树中的所有节点,直到不再存在 .

    我在JVM中找不到任何我认为有点令人惊讶的东西 . 所以我编写了一个类 AutoStopThreadPool ,可以直接使用它或子类来添加适合于域的方法,例如 schedule(Document) . 希望能帮助到你!

    AutoStopThreadPool Javadoc | Download

  • 6

    将您的任务提交到 Runner 然后等待调用方法 waitTillDone() ,如下所示:

    Runner runner = Runner.runner(2);
    
    for (DataTable singleTable : uniquePhrases) {
    
        runner.run(new ComputeDTask(singleTable));
    }
    
    // blocks until all tasks are finished (or failed)
    runner.waitTillDone();
    
    runner.shutdown();
    

    要使用它,请添加此gradle / maven依赖项: 'com.github.matejtymes:javafixes:1.0'

    有关详细信息,请查看此处:https://github.com/MatejTymes/JavaFixes或此处:http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

  • 3

    如果等待 ExecutorService 中的所有任务完成并不是您的目标,而是等到特定批次的任务完成后,您可以使用CompletionService - 特别是ExecutorCompletionService .

    我们的想法是通过 CompletionService 创建一个 Executorsubmit包含一些已知数量的任务,然后使用take()(阻止)或poll()(不支持)从完成队列中抽取相同数量的结果 . 一旦你've drawn all the expected results corresponding to the tasks you submitted, you know they'完成所有 .

    让我再说一次,因为界面并不明显:你必须知道你在 CompletionService 中放了多少东西才能知道要抽出多少东西 . 这对于 take() 方法尤为重要:调用它一次太多,它会阻塞你的调用线程,直到其他一些线程将另一个作业提交给同一个 CompletionService .

    Java Concurrency in Practice书中有some examples showing how to use CompletionService .

  • 2

    听起来你需要ForkJoinPool并使用全局池来执行任务 .

    public static void main(String[] args) {
        // the default `commonPool` should be sufficient for many cases.
        ForkJoinPool pool = ForkJoinPool.commonPool(); 
        // The root of your task that may spawn other tasks. 
        // Make sure it submits the additional tasks to the same executor that it is in.
        Runnable rootTask = new YourTask(pool); 
        pool.execute(rootTask);
        pool.awaitQuiescence(...);
        // that's it.
    }
    

    美丽在 pool.awaitQuiescence 中,方法将阻止利用调用者的线程执行其任务,然后在它真的为空时返回 .

  • 5

    最简单的方法是使用ExecutorService.invokeAll(),它可以在单行中执行您想要的操作 . 按照你的说法,你需要修改或包装 ComputeDTask 以实现 Callable<> ,这可以给你更多的灵活性 . 可能在你的应用程序中有一个有意义的 Callable.call() 实现,但如果不使用Executors.callable(),这里有一种方法可以包装它 .

    ExecutorService es = Executors.newFixedThreadPool(2);
    List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());
    
    for (DataTable singleTable: uniquePhrases) { 
        todo.add(Executors.callable(new ComputeDTask(singleTable))); 
    }
    
    List<Future<Object>> answers = es.invokeAll(todo);
    

    正如其他人所指出的那样,如果合适,您可以使用 invokeAll() 的超时版本 . 在这个例子中, answers 将包含一堆 Future ,它将返回空值(参见 Executors.callable() 的定义 . 可能你想要做的是轻微的重构,这样你就可以得到一个有用的答案,或者对底层的引用 ComputeDTask ,但我不能从你的例子中说出来 .

    如果不清楚,请注意 invokeAll() 在完成所有任务之前不会返回 . (例如,如果被问到, answers 集合中的所有 Future 将报告 .isDone() . )这样可以避免所有手动关机,等待终止等...并且如果需要,可以让您多次重复使用 ExecutorService 多个周期 .

    有关SO的一些相关问题:

    对于你的问题,这些都不是严格意义上的,但它们确实提供了一些关于人们如何认为 Executor / ExecutorService 应该被使用的颜色 .

  • 0

    IllegalMonitorStateException的根本原因:

    抛出以指示线程已尝试在对象的监视器上等待或通知在对象的监视器上等待而不拥有指定的监视器的其他线程 .

    从您的代码中,您刚刚在ExecutorService上调用了wait()而没有拥有锁 .

    下面的代码将修复 IllegalMonitorStateException

    try 
    {
        synchronized(es){
            es.wait(); // Add some condition before you call wait()
        }
    }
    

    按照以下方法之一等待完成已提交给 ExecutorService 的所有任务 .

    • ExecutorService 上迭代 submit 中的所有 Future 任务,并在 Future 对象上使用阻塞调用 get() 检查状态

    • ExecutorService 上使用invokeAll

    • 使用CountDownLatch

    • 使用ForkJoinPoolnewWorkStealingPool of Executors (自java 8起)

    • 按照oracle文档中的建议关闭池page

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    如果要在使用选项5而不是选项1到4时优雅地等待所有任务完成,请更改

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    a while(condition) 每1分钟检查一次 .

  • 51

    如果要等待执行程序服务完成执行,请调用 shutdown() 然后调用awaitTermination(units, unitType),例如 awaitTermination(1, MINUTE) . ExecutorService不会阻止's own monitor, so you can' t使用 wait 等 .

  • 44

    一个简单的替代方法是使用线程和连接 . 参考:Joining Threads

  • 9

    只是用

    latch = new CountDownLatch(noThreads)
    

    在每个线程中

    latch.countDown();
    

    并作为障碍

    latch.await();
    
  • 5

    如果要等待所有任务完成,请使用shutdown方法而不是 wait . 然后用awaitTermination跟着它 .

    此外,您可以使用Runtime.availableProcessors来获取硬件线程的数量,以便您可以正确初始化线程池 .

  • 0

    我将等待执行程序以指定的超时终止,您认为它适合完成任务 .

    try {  
             //do stuff here 
             exe.execute(thread);
        } finally {
            exe.shutdown();
        }
        boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
        if (!result)
    
        {
            LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
        }
    
  • 3

    你可以等待一段时间内完成工作:

    int maxSecondsPerComputeDTask = 20;
    try {
        while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
            // consider giving up with a 'break' statement under certain conditions
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);    
    }
    

    或者您可以使用ExecutorService . submit (Runnable)并收集它返回的Future对象并依次调用 get() 以等待它们完成 .

    ExecutorService es = Executors.newFixedThreadPool(2);
    Collection<Future<?>> futures = new LinkedList<<Future<?>>();
    for (DataTable singleTable : uniquePhrases) {
        futures.add(es.submit(new ComputeDTask(singleTable)));
    }
    for (Future<?> future : futures) {
       try {
           future.get();
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       } catch (ExecutionException e) {
           throw new RuntimeException(e);
       }
    }
    

    InterruptedException对于正确处理非常重要 . 这使您或您的库的用户可以安全地终止长时间的过程 .

  • 1

    添加集合中的所有线程并使用 invokeAll 提交 . 如果您可以使用 ExecutorServiceinvokeAll 方法,则在所有线程完成之前,JVM将不会继续下一行 .

    这里有一个很好的例子:invokeAll via ExecutorService

相关问题