我正在使用 java.util.concurrent.ThreadPoolExecutor
来并行处理多个项目 . 尽管线程本身工作正常,但由于线程中发生的操作,我们有时会遇到其他资源限制,这使我们想要调低池中线程的数量 .
我'd like to know if there'是一种在线程实际工作时拨打线程数的方法 . 我知道你可以调用 setMaximumPoolSize()
和/或 setCorePoolSize()
,但这些只会在线程空闲后调整池的大小,但是在队列中没有任务等待之前它们不会变为空闲 .
5 回答
据我所知,这是一个不太干净的方式是不可能的 .
您可以实现beforeExecute方法来检查一些布尔值并强制线程暂时停止 . 请记住,它们将包含一个在重新启用之前不会执行的任务 .
或者,您可以实现afterExecute以在饱和时抛出RuntimeException . 这将有效地导致Thread死亡,并且由于Executor将高于最大值,因此不会创建新的 .
我不建议你这样做 . 相反,尝试找到一些其他方法来控制导致问题的任务的并发执行 . 可能通过在具有更有限数量的工作者的单独线程池中执行它们 .
你绝对可以 . 调用
setCorePoolSize(int)
将更改池的核心大小 . 对此方法的调用是线程安全的,并覆盖提供给ThreadPoolExecutor
的构造函数的设置 . 如果要修剪池大小,其余线程将在其当前作业队列完成后关闭(如果它们处于空闲状态,它们将立即关闭) . 如果要增加池大小,将尽快分配新线程 . 分配新线程的时间范围没有记录 - 但在实现中,每次调用execute
方法时都会执行新线程的分配 .要将其与运行时可调参数作业服务器场配对,您可以将此属性(通过包装器或使用动态MBean导出器)公开为读写JMX属性,以创建一个相当不错的,即时可调的批处理器 .
要在运行时强制减小池大小(这是您的请求),您必须子类化
ThreadPoolExecutor
并添加对beforeExecute(Thread,Runnable)
方法的中断 . 中断线程不是一个充分的中断,因为它只与等待状态交互,并且在处理期间,任务线程不会进入可中断状态 .我最近遇到了同样的问题,试图在执行所有提交的任务之前强制终止线程池 . 为了实现这一点,我通过在将线程的
UncaughtExceptionHandler
替换为期望我的特定异常并丢弃它的线程之后抛出运行时异常来中断线程 .在您的情况下,您可能想要创建一个没有许可证的信号量(仅用作线程安全计数器),并且在关闭线程时向其释放许多许可,这些许可对应于先前核心池大小的增量和新池大小(要求您覆盖
setCorePoolSize(int)
方法) . 这将允许您在当前任务完成后终止线程 .这应该中断n个线程f(n)= active - request . 如果有任何问题,
ThreadPoolExecutor
的分配策略相当持久 . 它使用保证执行的finally
块保持提前终止 . 因此,即使您终止太多线程,它们也会重新填充 .解决方案是耗尽ThreadPoolExecutor队列,根据需要设置ThreadPoolExecutor大小,然后在其他线程结束时逐个添加线程 . 在ThreadPoolExecutor类中排空队列的方法是私有的,因此您必须自己创建它 . 这是代码:
在调用此方法之前,您需要获取然后释放主锁 . 为此,您需要使用java反射,因为字段“mainLock”是私有的 . 再次,这是代码:
“执行者”是你的ThreadPoolExecutor .
现在你需要锁定/解锁方法:
最后,您可以编写“setThreadsNumber”方法,它可以增加和减少ThreadPoolExecutor大小:
注意:显然如果你执行N个并行线程并且你将这个数字改为N-1,那么所有N个线程都会继续跑 . 当第一个线程结束时,不会执行新线程 . 从现在开始,并行线程的数量将是您选择的数量 .
我阅读了setMaximumPoolSize()和setCorePoolSize()的文档,看起来它们可以产生你需要的行为 .
我的结论是错误的:请参阅下面的讨论了解详情......
我也需要相同的解决方案,似乎在JDK8中setCorePoolSize()和setMaximumPoolSize()确实产生了所需的结果 . 我做了一个测试用例,我向池中提交了4个任务,并且它们执行得非常简单,我在运行时缩小了池的大小,并提交了另一个我想要寂寞的可运行的东西 . 然后我将池恢复到原始大小 . 这是测试源https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381
它产生以下输出(线程“50”是应该孤立执行的那个)