我的问题与this one here密切相关 . 正如在那里发布的那样,我希望主线程等到工作队列为空并且所有任务都已完成 . 然而,在我的情况下,问题是每个任务可以递归地导致提交新任务以进行处理 . 这使收集所有这些任务的未来变得有点尴尬 .
我们当前的解决方案使用忙等待循环来等待终止:
do { //Wait until we are done the processing
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (!executor.getQueue().isEmpty()
|| numTasks.longValue() > executor.getCompletedTaskCount());
numTasks是一个在创建每个新任务时增加的值 . 这有效但我认为由于忙碌的等待而不是很好 . 我想知道是否有一种好方法可以使主线程同步等待,直到被明确唤醒 .
9 回答
非常感谢你的所有建议!
最后,我选择了一些我认为相当简单的东西 . 我发现CountDownLatch几乎就是我所需要的 . 它会阻塞,直到计数器达到0.唯一的问题是它只能倒计时,而不是向上,因此在动态设置中不起作用,我可以在任务中提交新任务 . 因此,我实现了一个新类
CountLatch
,它提供了额外的功能 . (见下文)这个课我然后使用如下 .主线程调用
latch.awaitZero()
,阻塞直到锁存器达到0 .任何线程,在调用
executor.execute(..)
之前调用latch.increment()
.在完成之前的任何任务都会调用
latch.decrement()
.当最后一个任务终止时,计数器将达到0,从而释放主线程 .
欢迎提供进一步的建议和反馈!
请注意,
increment()
/decrement()
调用可以封装到自定义的Executor
子类中,例如,由Sami Korhonen建议,或者如impl所建议的beforeExecute
和afterExecute
. 看这里:Java 7提供了一个适合这个名为Phaser的用例的同步器 . 它是CountDownLatch和CyclicBarrier的可重用混合体,可以增加和减少注册方的数量(类似于可递增的CountDownLatch) .
在此方案中使用移相器的基本模式是在创建时使用移相器执行register任务,并在完成时使用arrive . 当到达方的数量与注册的数量相匹配时,相位器将进入下一阶段,并在进行时通知任何预先通过的线程 .
这是我创建的等待递归任务完成的示例 . 为了演示目的,它天真地找到Fibonacci序列的前几个数字:
这个实际上是一个很有趣的问题需要解决 . 我必须警告我没有完全测试代码 .
想法是简单地跟踪任务执行:
如果任务成功排队,则计数器加1
如果任务被取消且尚未执行,则计数器减1
如果已执行任务,则计数器减1
当调用shutdown并且有待处理的任务时,delegate不会在实际的ExecutorService上调用shutdown . 它将允许排队新任务,直到挂起的任务计数达到零,并在实际的ExecutorService上调用shutdown .
你为什么不用柜台?例如:
在将任务提交到队列之前将计数器递增1:
并在任务结束时将其减1:
检查将是这样的:
Java 7通过其ForkJoinPool executor结合了对递归任务的支持 . 它是quite simple to use并且可以很好地扩展,只要任务本身不是太微不足道 . 本质上,它提供了一个受控接口,允许任务等待任何子任务的完成,而不会无限期地阻塞底层线程 .
您链接到的答案中建议的选项之一是使用CompletionService
您可以使用以下命令替换主线程中的忙等待:
请注意,
getCompletedTaskCount
仅返回一个近似数字,因此您可能需要找到更好的退出条件 .如果您知道要等待的线程数,可以使用CountDownLatch(http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html)粘贴一行代码来增加每个线程的数量它可以解决您的问题
由于上一个任务不知道它是最后一个,我实际上认为没有记录就可以100%正确地完成这项工作在任务启动和完成任务时 .
如果内存对我有用,
getQueue()
方法将返回一个队列,该队列仅包含仍在等待执行的任务,而不是当前正在运行的任务 . 此外,getCompletedTaskCount()
是近似值 .我回答的解决方案和Condition用于发出唤醒主线程的信号(为简单起见,请原谅快捷方式):
它可能会更优雅(可能只是在你的线程池 Actuator 中提供一个
getState()
或者什么?),但我认为它应该完成工作 . 这也是未经测试的,所以要自己承担责任......值得注意的是,如果没有任务要执行,这个解决方案肯定会失败 - 它将无限期地等待信号 . 因此,如果您没有要运行的任务,甚至不必费心启动执行程序 .
Edit: 第二个想法,增加原子计数器应该在提交时发生,而不是在任务执行之前立即发生(因为排队可能导致计数器过早地降到0) . 替代
submit(...)
方法可能是有意义的,也可能是remove(...)
和shutdown()
(如果你使用它们) . 但总体思路仍然相同 . (但我想的越多,它就越不漂亮 . )我还要查看 class 的内部,看看你是否可以从中收集任何知识:http://hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java .
tryTerminate()
方法看起来很有趣 .您可以使用原子计数器来计算提交(就像在实际提交之前所说的那样) . 将其与信号量结合并将其释放到
afterExecute
提供的afterExecute
钩子中 . 在提交第一轮工作后,请致电semaphore.acquire( counter.get())
而不是忙碌等待 . 但是,当调用获取时,获取的数量将太小,因为计数器可能会在以后增加 . 您必须循环获取调用,并将自上次调用后的增加作为参数,直到计数器不再增加 .