首页 文章

执行者框架 - 生产环境 者消费者模式

提问于
浏览
1

Java_author在5.3.1节中提到了

...许多 生产环境 者 - 消费者设计可以使用Executor任务执行框架来表达,该框架本身使用 生产环境 者 - 消费者模式 . ... 生产环境 者 - 消费者模式提供了一种线程友好的方法,可以将问题分解为更简单的组件(如果可能) .


Executor框架实现是否内部遵循 生产环境 者 - 消费者模式?

如果是,那么 生产环境 者 - 消费者模式的概念如何有助于实施Executor框架?

2 回答

  • 1

    检查ThreadPoolExecutor的实施

    public void execute(Runnable command) {
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    

    现在检查

    private boolean addWorker(Runnable firstTask, boolean core) {
         // After some checks, it creates Worker and start the thread
        Worker w = new Worker(firstTask);
        Thread t = w.thread;
    
       // After some checks, thread has been started
       t.start();
    }
    

    执行 Worker

    /**
         * Class Worker mainly maintains interrupt control state for
         * threads running tasks, along with other minor bookkeeping.
         * This class opportunistically extends AbstractQueuedSynchronizer
         * to simplify acquiring and releasing a lock surrounding each
         * task execution.  This protects against interrupts that are
         * intended to wake up a worker thread waiting for a task from
         * instead interrupting a task being run.  We implement a simple
         * non-reentrant mutual exclusion lock rather than use ReentrantLock
         * because we do not want worker tasks to be able to reacquire the
         * lock when they invoke pool control methods like setCorePoolSize.
         */
        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
    
          /** Delegates main run loop to outer runWorker  */
           public void run() {
                runWorker(this);
           }
    
        final void runWorker(Worker w) {
              Runnable task = w.firstTask;
              w.firstTask = null;
              boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                clearInterruptsForTaskRun();
                try {
                    beforeExecute(w.thread, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    

    要执行哪个 Runnable 取决于以下逻辑 .

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
         // After some checks, below code returns Runnable
    
          try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
    }
    

    综上所述:

    • Producerexecute API中添加 RunnableCallableworkQueue.offer(command)

    • execute() 方法根据需要创建 Worker 线程

    • 这个 Worker 线程以无限循环运行 . 它从 getTask() 获取任务(例如 Runnable

    BlockingQueue<Runnable> workQueue)

    • getTask() 个池并取 Runnable . 它是 consumerBlockingQueue .

    Executor框架实现是否内部遵循 生产环境 者 - 消费者模式?

    是的,如上所述 .

    如果是, 生产环境 者 - 消费者模式的概念如何帮助实施Executor框架?

    BlockingQueue 实现如 ArrayBlockingQueueExecutorService implementation ThreadPoolExecutor 是线程安全的 . 程序员明确实现同步,等待和通知调用以实现相同的开销已经减少了 .

  • 1

    Executor framework 使用 producer-consumer 模式 .

    来自维基百科,

    在计算中, 生产环境 者 - 消费者问题(也称为有界缓冲问题)是多进程同步问题的典型示例 . 该问题描述了两个进程, 生产环境 者和使用者,他们共享一个用作队列的通用固定大小缓冲区 . 生产环境 者的工作是生成数据,将其放入缓冲区,然后重新开始 . 同时,消费者一次一件地消费数据(即,将其从缓冲器中移除) . 问题是确保 生产环境 者不会尝试将数据添加到缓冲区(如果已满)并且消费者不会尝试从空缓冲区中删除数据 .

    如果我们看一下不同的 ExecutorService framework 实现,更具体地说是 ThreadPoolExecutor 类,它基本上有以下内容:

    • 提交和保留作业的队列

    • 使用提交到队列的任务的线程数 .

    根据执行程序服务的类型,这些参数会发生变化

    例如,

    • 固定线程池使用 LinkedBlockingQueue 和用户配置的没有线程

    • 缓存线程池根据提交的任务数使用 SynchronousQueue0Integer.MAX_VALUE 之间没有线程

相关问题