等待 ExecutorService
的所有任务完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的工作 - 每个核心一个 . 现在我的设置如下:
ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {
es.execute(new ComputeDTask(singleTable));
}
try{
es.wait();
}
catch (InterruptedException e){
e.printStackTrace();
}
ComputeDTask
实现了runnable . 这似乎正确执行任务,但代码在 wait()
上与 IllegalMonitorStateException
崩溃 . 这很奇怪,因为我玩了一些玩具示例,它似乎工作 .
uniquePhrases
包含数万个元素 . 我应该使用其他方法吗?我正在寻找尽可能简单的东西
14 回答
您可以使用
ExecutorService.invokeAll
方法,它将执行所有任务并等待所有线程完成其任务 .这是完整的javadoc
您还可以使用此方法的重载版本来指定超时 .
以下是带有
ExecutorService.invokeAll
的示例代码我还有一种情况,我有一组要爬网的文件 . 我从一个应该处理的初始“种子”文档开始,该文档包含指向其他文档的链接,这些文档也应该被处理,等等 .
在我的主程序中,我只想编写类似下面的内容,其中
Crawler
控制着一堆线程 .如果我想导航树,也会发生同样的情况;我会弹出根节点,每个节点的处理器会根据需要将子节点添加到队列中,并且一堆线程将处理树中的所有节点,直到不再存在 .
我在JVM中找不到任何我认为有点令人惊讶的东西 . 所以我编写了一个类
AutoStopThreadPool
,可以直接使用它或子类来添加适合于域的方法,例如schedule(Document)
. 希望能帮助到你!AutoStopThreadPool Javadoc | Download
将您的任务提交到 Runner 然后等待调用方法 waitTillDone() ,如下所示:
要使用它,请添加此gradle / maven依赖项:
'com.github.matejtymes:javafixes:1.0'
有关详细信息,请查看此处:https://github.com/MatejTymes/JavaFixes或此处:http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
如果等待
ExecutorService
中的所有任务完成并不是您的目标,而是等到特定批次的任务完成后,您可以使用CompletionService - 特别是ExecutorCompletionService .我们的想法是通过
CompletionService
创建一个Executor
,submit包含一些已知数量的任务,然后使用take()(阻止)或poll()(不支持)从完成队列中抽取相同数量的结果 . 一旦你've drawn all the expected results corresponding to the tasks you submitted, you know they'完成所有 .让我再说一次,因为界面并不明显:你必须知道你在
CompletionService
中放了多少东西才能知道要抽出多少东西 . 这对于take()
方法尤为重要:调用它一次太多,它会阻塞你的调用线程,直到其他一些线程将另一个作业提交给同一个CompletionService
.Java Concurrency in Practice书中有some examples showing how to use CompletionService .
听起来你需要ForkJoinPool并使用全局池来执行任务 .
美丽在
pool.awaitQuiescence
中,方法将阻止利用调用者的线程执行其任务,然后在它真的为空时返回 .最简单的方法是使用ExecutorService.invokeAll(),它可以在单行中执行您想要的操作 . 按照你的说法,你需要修改或包装
ComputeDTask
以实现Callable<>
,这可以给你更多的灵活性 . 可能在你的应用程序中有一个有意义的Callable.call()
实现,但如果不使用Executors.callable(),这里有一种方法可以包装它 .正如其他人所指出的那样,如果合适,您可以使用
invokeAll()
的超时版本 . 在这个例子中,answers
将包含一堆Future
,它将返回空值(参见Executors.callable()
的定义 . 可能你想要做的是轻微的重构,这样你就可以得到一个有用的答案,或者对底层的引用ComputeDTask
,但我不能从你的例子中说出来 .如果不清楚,请注意
invokeAll()
在完成所有任务之前不会返回 . (例如,如果被问到,answers
集合中的所有Future
将报告.isDone()
. )这样可以避免所有手动关机,等待终止等...并且如果需要,可以让您多次重复使用ExecutorService
多个周期 .有关SO的一些相关问题:
How to wait for all threads to finish
Return values from java threads
invokeAll() not willing to accept a Collection<Callable<t>>
Do I need to synchronize?
对于你的问题,这些都不是严格意义上的,但它们确实提供了一些关于人们如何认为
Executor
/ExecutorService
应该被使用的颜色 .IllegalMonitorStateException的根本原因:
从您的代码中,您刚刚在ExecutorService上调用了wait()而没有拥有锁 .
下面的代码将修复
IllegalMonitorStateException
按照以下方法之一等待完成已提交给
ExecutorService
的所有任务 .在
ExecutorService
上迭代submit
中的所有Future
任务,并在Future
对象上使用阻塞调用get()
检查状态在
ExecutorService
上使用invokeAll使用CountDownLatch
使用ForkJoinPool或newWorkStealingPool of
Executors
(自java 8起)按照oracle文档中的建议关闭池page
如果要在使用选项5而不是选项1到4时优雅地等待所有任务完成,请更改
至
a
while(condition)
每1分钟检查一次 .如果要等待执行程序服务完成执行,请调用
shutdown()
然后调用awaitTermination(units, unitType),例如awaitTermination(1, MINUTE)
. ExecutorService不会阻止's own monitor, so you can' t使用wait
等 .一个简单的替代方法是使用线程和连接 . 参考:Joining Threads
只是用
在每个线程中
并作为障碍
如果要等待所有任务完成,请使用shutdown方法而不是
wait
. 然后用awaitTermination跟着它 .此外,您可以使用Runtime.availableProcessors来获取硬件线程的数量,以便您可以正确初始化线程池 .
我将等待执行程序以指定的超时终止,您认为它适合完成任务 .
你可以等待一段时间内完成工作:
或者您可以使用ExecutorService . submit (Runnable)并收集它返回的Future对象并依次调用 get() 以等待它们完成 .
InterruptedException对于正确处理非常重要 . 这使您或您的库的用户可以安全地终止长时间的过程 .
添加集合中的所有线程并使用
invokeAll
提交 . 如果您可以使用ExecutorService
的invokeAll
方法,则在所有线程完成之前,JVM将不会继续下一行 .这里有一个很好的例子:invokeAll via ExecutorService