首页 文章

是否可以向ThreadPoolExecutor的BlockingQueue添加任务?

提问于
浏览
17

用于ThreadPoolExecutor的JavaDoc不清楚是否可以将任务直接添加到支持执行程序的 BlockingQueue . The docs say调用 executor.getQueue() 是"intended primarily for debugging and monitoring" .

我正用自己的 BlockingQueue 构建一个 ThreadPoolExecutor . 我保留对队列的引用,以便我可以直接向其添加任务 . getQueue() 返回相同的队列,因此我假设 getQueue() 中的警告适用于通过我的方式获取的后备队列的引用 .

示例

代码的一般模式是:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer()vs executor.execute()

据我了解,典型的用途是通过 executor.execute() 添加任务 . 上面示例中的方法具有阻塞队列的优点,而如果队列已满, execute() 会立即失败并拒绝我的任务 . 我也喜欢提交作业与阻塞队列交互;这对我来说更像是 生产环境 者 - 消费者 .

直接向队列添加任务的含义:我必须调用 prestartAllCoreThreads() ,否则没有工作线程正在运行 . 假设没有与执行程序的其他交互,没有任何东西将监视队列(检查 ThreadPoolExecutor 源确认这一点) . 这也意味着直接排队 ThreadPoolExecutor 必须另外配置为> 0个核心线程,并且不得配置为允许核心线程超时 .

tl;博士

鉴于 ThreadPoolExecutor 配置如下:

  • 核心线程> 0

  • 核心线程不允许超时

  • 核心线程是预先启动的

  • 持有对支持执行者的 BlockingQueue 的引用

将任务直接添加到队列而不是调用 executor.execute() 是否可以接受?

相关

这个问题(producer/consumer work queues)类似,但没有具体涉及直接添加到队列 .

5 回答

  • 4

    通过在实例化时指定 RejectedExecutionHandler ,实际上可以在队列满时配置池的行为 . ThreadPoolExecutor 将四个策略定义为内部类,包括 AbortPolicyDiscardOldestPolicyDiscardPolicy ,以及我个人最喜欢的 CallerRunsPolicy ,它在控制线程中运行新作业 .

    例如:

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            nproc, // core size
            nproc, // max size
            60, // idle timeout
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
            new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.
    

    可以使用以下内容获得问题中所需的行为:

    public class BlockingPolicy implements RejectedExecutionHandler {
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            executor.getQueue.put(r); // Self contained, no queue reference needed.
        }
    

    在某些时候,必须访问队列 . 这样做的最佳位置是自包含的 RejectedExecutionHandler ,它可以保存在池对象范围内直接操作队列所产生的任何代码重复或潜在错误 . 请注意, ThreadPoolExecutor 中包含的处理程序本身使用 getQueue() .

  • 0

    如果是我,我宁愿使用 Executor#execute() 而不是 Queue#offer() ,因为我已经使用了 java.util.concurrent 中的所有其他内容 .

    你的问题很好,引起了我的兴趣,所以我看了 ThreadPoolExecutor#execute() 的来源:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }
    

    我们可以看到execute本身在工作队列中调用了 offer() ,但是在必要之前没有做一些漂亮,美味的池操作 . 因此,我建议使用 execute() ;不使用它可能(虽然我认为使用 offer() 会破坏执行程序 - 看起来任务是使用以下(也来自ThreadPoolExecutor)从队列中拉出的:

    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }
    

    这个 getTask() 方法只是在循环中调用,所以如果执行程序's not shutting down, it'阻塞,直到给队列一个新任务(无论它来自哪里) .

    注意:尽管我依赖它们来获得明确的答案 - 我们应该只编写API . 我们不知道 execute() 的实施将如何随着时间而改变 .

  • 8

    一个技巧是实现ArrayBlockingQueue的自定义子类并覆盖offer()方法来调用阻塞版本,然后您仍然可以使用正常的代码路径 .

    queue = new ArrayBlockingQueue<Runnable>(queueSize) {
      @Override public boolean offer(Runnable runnable) {
        try {
          return offer(runnable, 1, TimeUnit.HOURS);
        } catch(InterruptedException e) {
          // return interrupt status to caller
          Thread.currentThread().interrupt();
        }
        return false;
      }
    };
    

    (正如你可能猜到的那样,我认为直接在队列中调用offer是正常的代码路径可能是一个坏主意) .

  • 11

    它使用的是与标准内存 LinkedBlockingQueueArrayBlockingQueue 完全不同的实现 .

    例如,如果你想要像OP一样阻止 offer() .

    因此,给定的答案是,必须调用 prestartAllCoreThreads() (或足够的时间 prestartCoreThread() )以使工作线程可用并运行,这一点非常重要 .

  • 1

    如果需要,我们还可以使用一个停车场,将主要处理与被拒绝的任务分开 -

    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
        final List<Runnable> taskParking = new LinkedList<Runnable>();
        BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
        RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
                taskCounter.countDown();
                taskParking.add(r);
            }
        };
        ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
        for(int i=0 ; i<TASKCOUNT; i++){
            //main 
            threadPoolExecutor.submit(getRandomTask());
        }
        taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
        System.out.println("Checking the parking lot..." + taskParking);
        while(taskParking.size() > 0){
            Runnable r = taskParking.remove(0);
            System.out.println("Running from parking lot..." + r);
            if(taskParking.size() > LIMIT){
              waitForSometime(...);
            }
            threadPoolExecutor.submit(r);
        }
        threadPoolExecutor.shutdown();
    

相关问题