首页 文章

在具有等待任务时动态调整java.util.concurrent.ThreadPoolExecutor的大小

提问于
浏览
15

我正在使用 java.util.concurrent.ThreadPoolExecutor 来并行处理多个项目 . 尽管线程本身工作正常,但由于线程中发生的操作,我们有时会遇到其他资源限制,这使我们想要调低池中线程的数量 .

我'd like to know if there'是一种在线程实际工作时拨打线程数的方法 . 我知道你可以调用 setMaximumPoolSize() 和/或 setCorePoolSize() ,但这些只会在线程空闲后调整池的大小,但是在队列中没有任务等待之前它们不会变为空闲 .

5 回答

  • 6

    据我所知,这是一个不太干净的方式是不可能的 .

    您可以实现beforeExecute方法来检查一些布尔值并强制线程暂时停止 . 请记住,它们将包含一个在重新启用之前不会执行的任务 .

    或者,您可以实现afterExecute以在饱和时抛出RuntimeException . 这将有效地导致Thread死亡,并且由于Executor将高于最大值,因此不会创建新的 .

    我不建议你这样做 . 相反,尝试找到一些其他方法来控制导致问题的任务的并发执行 . 可能通过在具有更有限数量的工作者的单独线程池中执行它们 .

  • 5

    你绝对可以 . 调用 setCorePoolSize(int) 将更改池的核心大小 . 对此方法的调用是线程安全的,并覆盖提供给 ThreadPoolExecutor 的构造函数的设置 . 如果要修剪池大小,其余线程将在其当前作业队列完成后关闭(如果它们处于空闲状态,它们将立即关闭) . 如果要增加池大小,将尽快分配新线程 . 分配新线程的时间范围没有记录 - 但在实现中,每次调用 execute 方法时都会执行新线程的分配 .

    要将其与运行时可调参数作业服务器场配对,您可以将此属性(通过包装器或使用动态MBean导出器)公开为读写JMX属性,以创建一个相当不错的,即时可调的批处理器 .

    要在运行时强制减小池大小(这是您的请求),您必须子类化 ThreadPoolExecutor 并添加对 beforeExecute(Thread,Runnable) 方法的中断 . 中断线程不是一个充分的中断,因为它只与等待状态交互,并且在处理期间,任务线程不会进入可中断状态 .

    我最近遇到了同样的问题,试图在执行所有提交的任务之前强制终止线程池 . 为了实现这一点,我通过在将线程的 UncaughtExceptionHandler 替换为期望我的特定异常并丢弃它的线程之后抛出运行时异常来中断线程 .

    /**
     * A runtime exception used to prematurely terminate threads in this pool.
     */
    static class ShutdownException
    extends RuntimeException {
        ShutdownException (String message) {
            super(message);
        }
    }
    
    /**
     * This uncaught exception handler is used only as threads are entered into
     * their shutdown state.
     */
    static class ShutdownHandler 
    implements UncaughtExceptionHandler {
        private UncaughtExceptionHandler handler;
    
        /**
         * Create a new shutdown handler.
         *
         * @param handler The original handler to deligate non-shutdown
         * exceptions to.
         */
        ShutdownHandler (UncaughtExceptionHandler handler) {
            this.handler = handler;
        }
        /**
         * Quietly ignore {@link ShutdownException}.
         * <p>
         * Do nothing if this is a ShutdownException, this is just to prevent
         * logging an uncaught exception which is expected.  Otherwise forward
         * it to the thread group handler (which may hand it off to the default
         * uncaught exception handler).
         * </p>
         */
        public void uncaughtException (Thread thread, Throwable throwable) {
            if (!(throwable instanceof ShutdownException)) {
                /* Use the original exception handler if one is available,
                 * otherwise use the group exception handler.
                 */
                if (handler != null) {
                    handler.uncaughtException(thread, throwable);
                }
            }
        }
    }
    /**
     * Configure the given job as a spring bean.
     *
     * <p>Given a runnable task, configure it as a prototype spring bean,
     * injecting any necessary dependencices.</p>
     *
     * @param thread The thread the task will be executed in.
     * @param job The job to configure.
     *
     * @throws IllegalStateException if any error occurs.
     */
    protected void beforeExecute (final Thread thread, final Runnable job) {
        /* If we're in shutdown, it's because spring is in singleton shutdown
         * mode.  This means we must not attempt to configure the bean, but
         * rather we must exit immediately (prematurely, even).
         */
        if (!this.isShutdown()) {
            if (factory == null) {
                throw new IllegalStateException(
                    "This class must be instantiated by spring"
                    );
            }
    
            factory.configureBean(job, job.getClass().getName());
        }
        else {
            /* If we are in shutdown mode, replace the job on the queue so the
             * next process will see it and it won't get dropped.  Further,
             * interrupt this thread so it will no longer process jobs.  This
             * deviates from the existing behavior of shutdown().
             */
            workQueue.add(job);
    
            thread.setUncaughtExceptionHandler(
                new ShutdownHandler(thread.getUncaughtExceptionHandler())
                );
    
            /* Throwing a runtime exception is the only way to prematurely
             * cause a worker thread from the TheadPoolExecutor to exit.
             */
            throw new ShutdownException("Terminating thread");
        }
    }
    

    在您的情况下,您可能想要创建一个没有许可证的信号量(仅用作线程安全计数器),并且在关闭线程时向其释放许多许可,这些许可对应于先前核心池大小的增量和新池大小(要求您覆盖 setCorePoolSize(int) 方法) . 这将允许您在当前任务完成后终止线程 .

    private Semaphore terminations = new Semaphore(0);
    
    protected void beforeExecute (final Thread thread, final Runnable job) {
        if (terminations.tryAcquire()) {
            /* Replace this item in the queue so it may be executed by another
             * thread
             */
            queue.add(job);
    
            thread.setUncaughtExceptionHandler(
                new ShutdownHandler(thread.getUncaughtExceptionHandler())
                );
    
            /* Throwing a runtime exception is the only way to prematurely
             * cause a worker thread from the TheadPoolExecutor to exit.
             */
            throw new ShutdownException("Terminating thread");
        }
    }
    
    public void setCorePoolSize (final int size) {
        int delta = getActiveCount() - size;
    
        super.setCorePoolSize(size);
    
        if (delta > 0) {
            terminations.release(delta);
        }
    }
    

    这应该中断n个线程f(n)= active - request . 如果有任何问题, ThreadPoolExecutor 的分配策略相当持久 . 它使用保证执行的 finally 块保持提前终止 . 因此,即使您终止太多线程,它们也会重新填充 .

  • 0

    解决方案是耗尽ThreadPoolExecutor队列,根据需要设置ThreadPoolExecutor大小,然后在其他线程结束时逐个添加线程 . 在ThreadPoolExecutor类中排空队列的方法是私有的,因此您必须自己创建它 . 这是代码:

    /**
     * Drains the task queue into a new list. Used by shutdownNow.
     * Call only while holding main lock.
     */
    public static List<Runnable> drainQueue() {
        List<Runnable> taskList = new ArrayList<Runnable>();
        BlockingQueue<Runnable> workQueue = executor.getQueue();
        workQueue.drainTo(taskList);
        /*
         * If the queue is a DelayQueue or any other kind of queue
         * for which poll or drainTo may fail to remove some elements,
         * we need to manually traverse and remove remaining tasks.
         * To guarantee atomicity wrt other threads using this queue,
         * we need to create a new iterator for each element removed.
         */
        while (!workQueue.isEmpty()) {
            Iterator<Runnable> it = workQueue.iterator();
            try {
                if (it.hasNext()) {
                    Runnable r = it.next();
                    if (workQueue.remove(r))
                        taskList.add(r);
                }
            } catch (ConcurrentModificationException ignore) {
            }
        }
        return taskList;
    }
    

    在调用此方法之前,您需要获取然后释放主锁 . 为此,您需要使用java反射,因为字段“mainLock”是私有的 . 再次,这是代码:

    private Field getMainLock() throws NoSuchFieldException {
        Field mainLock = executor.getClass().getDeclaredField("mainLock");
        mainLock.setAccessible(true);
        return mainLock;
    }
    

    “执行者”是你的ThreadPoolExecutor .

    现在你需要锁定/解锁方法:

    public void lock() {
        try {
            Field mainLock = getMainLock();
            Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null);
            lock.invoke(mainLock.get(executor), (Object[])null);
        } catch {
            ...
        } 
    }
    
    public void unlock() {
        try {
            Field mainLock = getMainLock();
            mainLock.setAccessible(true);
            Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null);
            lock.invoke(mainLock.get(executor), (Object[])null);
        } catch {
            ...
        }  
    }
    

    最后,您可以编写“setThreadsNumber”方法,它可以增加和减少ThreadPoolExecutor大小:

    public void setThreadsNumber(int intValue) {
        boolean increasing = intValue > executor.getPoolSize();
        executor.setCorePoolSize(intValue);
        executor.setMaximumPoolSize(intValue);
        if(increasing){
            if(drainedQueue != null && (drainedQueue.size() > 0)){
                executor.submit(drainedQueue.remove(0));
            }
        } else {
            if(drainedQueue == null){
                lock();
                drainedQueue = drainQueue();
                unlock();
            }
        }
    }
    

    注意:显然如果你执行N个并行线程并且你将这个数字改为N-1,那么所有N个线程都会继续跑 . 当第一个线程结束时,不会执行新线程 . 从现在开始,并行线程的数量将是您选择的数量 .

  • 35

    我阅读了setMaximumPoolSize()和setCorePoolSize()的文档,看起来它们可以产生你需要的行为 .

    • 编辑 -

    我的结论是错误的:请参阅下面的讨论了解详情......

  • 0

    我也需要相同的解决方案,似乎在JDK8中setCorePoolSize()和setMaximumPoolSize()确实产生了所需的结果 . 我做了一个测试用例,我向池中提交了4个任务,并且它们执行得非常简单,我在运行时缩小了池的大小,并提交了另一个我想要寂寞的可运行的东西 . 然后我将池恢复到原始大小 . 这是测试源https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381

    它产生以下输出(线程“50”是应该孤立执行的那个)

    run:
    test thread 2 enter
    test thread 1 enter
    test thread 3 enter
    test thread 4 enter
    test thread 1 exit
    test thread 2 exit
    test thread 3 exit
    test thread 4 exit
    test thread 50 enter
    test thread 50 exit
    test thread 1 enter
    test thread 2 enter
    test thread 3 enter
    test thread 4 enter
    test thread 1 exit
    test thread 2 exit
    test thread 3 exit
    test thread 4 exit
    

相关问题