用于ThreadPoolExecutor的JavaDoc不清楚是否可以将任务直接添加到支持执行程序的 BlockingQueue
. The docs say调用 executor.getQueue()
是"intended primarily for debugging and monitoring" .
我正用自己的 BlockingQueue
构建一个 ThreadPoolExecutor
. 我保留对队列的引用,以便我可以直接向其添加任务 . getQueue()
返回相同的队列,因此我假设 getQueue()
中的警告适用于通过我的方式获取的后备队列的引用 .
示例
代码的一般模式是:
int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
Runnable job = ...;
queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
try {
Thread.sleep(...);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
executor.shutdownNow();
queue.offer()vs executor.execute()
据我了解,典型的用途是通过 executor.execute()
添加任务 . 上面示例中的方法具有阻塞队列的优点,而如果队列已满, execute()
会立即失败并拒绝我的任务 . 我也喜欢提交作业与阻塞队列交互;这对我来说更像是 生产环境 者 - 消费者 .
直接向队列添加任务的含义:我必须调用 prestartAllCoreThreads()
,否则没有工作线程正在运行 . 假设没有与执行程序的其他交互,没有任何东西将监视队列(检查 ThreadPoolExecutor
源确认这一点) . 这也意味着直接排队 ThreadPoolExecutor
必须另外配置为> 0个核心线程,并且不得配置为允许核心线程超时 .
tl;博士
鉴于 ThreadPoolExecutor
配置如下:
-
核心线程> 0
-
核心线程不允许超时
-
核心线程是预先启动的
-
持有对支持执行者的
BlockingQueue
的引用
将任务直接添加到队列而不是调用 executor.execute()
是否可以接受?
相关
这个问题(producer/consumer work queues)类似,但没有具体涉及直接添加到队列 .
5 回答
如果是我,我宁愿使用
Executor#execute()
而不是Queue#offer()
,因为我已经使用了java.util.concurrent
中的所有其他内容 .你的问题很好,引起了我的兴趣,所以我看了
ThreadPoolExecutor#execute()
的来源:我们可以看到execute本身在工作队列中调用了
offer()
,但是在必要之前没有做一些漂亮,美味的池操作 . 因此,我建议使用execute()
;不使用它可能(虽然我认为使用offer()
会破坏执行程序 - 看起来任务是使用以下(也来自ThreadPoolExecutor)从队列中拉出的:这个
getTask()
方法只是在循环中调用,所以如果执行程序's not shutting down, it'阻塞,直到给队列一个新任务(无论它来自哪里) .注意:尽管我依赖它们来获得明确的答案 - 我们应该只编写API . 我们不知道
execute()
的实施将如何随着时间而改变 .一个技巧是实现ArrayBlockingQueue的自定义子类并覆盖offer()方法来调用阻塞版本,然后您仍然可以使用正常的代码路径 .
(正如你可能猜到的那样,我认为直接在队列中调用offer是正常的代码路径可能是一个坏主意) .
通过在实例化时指定
RejectedExecutionHandler
,实际上可以在队列满时配置池的行为 .ThreadPoolExecutor
将四个策略定义为内部类,包括AbortPolicy
,DiscardOldestPolicy
,DiscardPolicy
,以及我个人最喜欢的CallerRunsPolicy
,它在控制线程中运行新作业 .例如:
可以使用以下内容获得问题中所需的行为:
在某些时候,必须访问队列 . 这样做的最佳位置是自包含的
RejectedExecutionHandler
,它可以保存在池对象范围内直接操作队列所产生的任何代码重复或潜在错误 . 请注意,ThreadPoolExecutor
中包含的处理程序本身使用getQueue()
.它使用的是与标准内存
LinkedBlockingQueue
或ArrayBlockingQueue
完全不同的实现 .例如,如果你想要像OP一样阻止
offer()
.因此,给定的答案是,必须调用
prestartAllCoreThreads()
(或足够的时间prestartCoreThread()
)以使工作线程可用并运行,这一点非常重要 .如果需要,我们还可以使用一个停车场,将主要处理与被拒绝的任务分开 -