首页 文章

使用ThreadPoolExecutor的同步任务 生产环境 者/使用者

提问于
浏览
2

我想在以下场景中找到一种使用 ThreadPoolExecutor 的方法:

  • 我有一个单独的线程在线程池上生成和提交任务

  • 任务提交是同步的,并将阻止,直到 ThreadPoolExecutor 可以启动任务

  • 在任何给定时间,只能执行固定数量的任务并行执行 . 同时运行的无限数量的任务可能导致内存耗尽 .
    在提交任务之前

  • , 生产环境 者线程始终检查自第一个提交的任务以来未超过某个最大构建时间 . 如果超出,则线程将关闭,但当前在线程池上运行的任何任务在应用程序终止之前都会运行完成 .

  • 当 生产环境 者线程终止时,线程池的队列上应该没有未启动的任务 .

为了给出更多上下文,我目前只提交所有任务,并在最长构建时间到期后取消 ExecutorService.submit 返回的所有期货 . 我忽略了所有结果 CancellationException ,因为它们是预期的 . 问题是 Future.cancel(false) 的行为是奇怪的,不适合我的用例:

  • 它可以防止任何未启动的任务运行(好)

  • 它不会中断当前正在运行的任务并让它们运行完成(好)
    但是,它会忽略当前正在运行的任务抛出的任何异常,而是抛出 CancellationException ,其中 Exception.getCause()null . 因此,我可能不幸,因为在这种情况下我想传播异常并将其报告给一些错误处理机制 .

我查看了Java提供的不同阻塞队列,并发现了这个:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/SynchronousQueue.html . 这看起来很理想,但随后看着https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html,似乎没有像我想要的那样使用 ThreadPoolExecutor

直接切换 . 工作队列的一个很好的默认选择是SynchronousQueue,它将任务交给线程而不另外保存它们 . 在这里,如果没有线程立即可用于运行它,则尝试对任务进行排队将失败,因此将构造新线程 . 此策略在处理可能具有内部依赖性的请求集时避免了锁定 . 直接切换通常需要无限制的maximumPoolSizes以避免拒绝新提交的任务 . 这反过来承认,当命令继续以比处理它们更快的速度到达时,无限制的线程增长的可能性 .

理想情况是 SynchronousQueue.poll 上的消费者(=池)阻塞和 SynchronousQueue.put 上的 生产环境 者(=任务生成者线程)阻塞 .

知道如何在不编写任何复杂的调度逻辑的情况下实现我描述的场景( ThreadPoolExecutor 应该包含哪些内容)?

2 回答

  • 1

    我相信你正走在正确的道路上......所有你要做的就是使用 SynchronousQueueRejectedExecutionHandler 结合,使用以下constructor ...这样你就可以定义一个固定的最大尺寸线程池(限制你的资源使用)并定义一个回退机制来重新安排那些无法处理的任务(因为池已满)...示例:

    public class Experiment {
    
        public static final long HANDLER_SLEEP_TIME = 4000;
        public static final int MAX_POOL_SIZE = 1;
    
        public static void main(String[] args) throws InterruptedException {
            SynchronousQueue<Runnable> queue;
            RejectedExecutionHandler handler;
            ThreadPoolExecutor pool;
            Runnable runA, runB;
    
            queue   = new SynchronousQueue<>();
            handler = new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        System.out.println("Handler invoked! Thread: " + Thread.currentThread().getName());
                        Thread.sleep(HANDLER_SLEEP_TIME); // this let runnableA finish
                        executor.submit(r);    // re schedule
    
                    } catch (InterruptedException ex) {
                        throw new RuntimeException("Handler Exception!", ex);
                    }
                }
            };
    
            pool = new ThreadPoolExecutor(1, MAX_POOL_SIZE, 10, TimeUnit.SECONDS, queue, handler);
            runA = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                        System.out.println("hello, I'm runnable A");
    
                    } catch (Exception ex) {
                        throw new RuntimeException("RunnableA", ex);
                    }
                }
            };
            runB = new Runnable() {
                @Override
                public void run() {
                    System.out.println("hello, I'm runnable B");
                }
            };
    
            pool.submit(runA);
            pool.submit(runB);
            pool.shutdown();
        }
    }
    

    注意: RejectedExecutionHandler 的实施由您决定!我只是建议将一个睡眠视为一种阻塞机制,但是你可以将逻辑更复杂,因为询问线程池是否有空闲线程 . 如果没有,那么睡觉;如果是,那么再次提交任务......

  • 0

    我找到了另一个选项,而不是@Carlitos Way提出的选项 . 它包括使用 BlockingQueue.offer 直接在队列中添加任务 . 我没有设法让它首先工作的唯一原因是我不得不发布这个问题是我不知道 ThreadPoolExecutor 的默认行为是在没有任何线程的情况下启动 . 线程将使用线程工厂延迟创建,并且可以删除/重新填充,具体取决于池的核心和最大大小以及同时提交的任务数 .

    由于线程创建是懒惰的,我试图阻止对 offer 的调用失败,因为如果没有人等待从队列中获取元素, SynchronousQueue.offer 会立即退出 . 相反, SynchronousQueue.put 阻塞,直到有人要求从队列中取一个项目,如果线程池为空,这将永远不会发生 .

    因此,解决方法是强制线程池使用 ThreadPoolExecutor.prestartAllCoreThreads 急切地创建核心线程 . 然后我的问题变得相当微不足道 . 我制作了一个真实用例的简化版本:

    import java.util.Random;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.atomic.AtomicLong;
    
    import static java.util.concurrent.TimeUnit.MILLISECONDS;
    import static java.util.concurrent.TimeUnit.SECONDS;
    
    public class SimplifiedBuildScheduler {
        private static final int MAX_POOL_SIZE = 10;
    
        private static final Random random = new Random();
        private static final AtomicLong nextTaskId = new AtomicLong(0);
    
        public static void main(String[] args) throws InterruptedException {
            SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
    
            // this is a soft requirement in my system, not a real-time guarantee. See the complete semantics in my question.
            long maxBuildTimeInMillis = 50;
            // this timeout must be small compared to maxBuildTimeInMillis in order to accurately match the maximum build time
            long taskSubmissionTimeoutInMillis = 1;
    
            ThreadPoolExecutor pool = new ThreadPoolExecutor(MAX_POOL_SIZE, MAX_POOL_SIZE, 0, SECONDS, queue);
            pool.prestartAllCoreThreads();
    
            Runnable nextTask = makeTask(maxBuildTimeInMillis);
    
            long millisAtStart = System.currentTimeMillis();
            while (maxBuildTimeInMillis > System.currentTimeMillis() - millisAtStart) {
                boolean submitted = queue.offer(nextTask, taskSubmissionTimeoutInMillis, MILLISECONDS);
                if (submitted) {
                    nextTask = makeTask(maxBuildTimeInMillis);
                } else {
                    System.out.println("Task " + nextTaskId.get() + " was not submitted. " + "It will be rescheduled unless " +
                            "the max build time has expired");
                }
            }
    
            System.out.println("Max build time has expired. Stop submitting new tasks and running existing tasks to completion");
    
            pool.shutdown();
            pool.awaitTermination(9999999, SECONDS);
        }
    
        private static Runnable makeTask(long maxBuildTimeInMillis) {
            long sleepTimeInMillis = randomSleepTime(maxBuildTimeInMillis);
            long taskId = nextTaskId.getAndIncrement();
            return () -> {
                try {
                    System.out.println("Task " + taskId + " sleeping for " + sleepTimeInMillis + " ms");
                    Thread.sleep(sleepTimeInMillis);
                    System.out.println("Task " + taskId + " completed !");
                } catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            };
        }
    
        private static int randomSleepTime(long maxBuildTimeInMillis) {
            // voluntarily make it possible that a task finishes after the max build time is expired
            return 1 + random.nextInt(2 * Math.toIntExact(maxBuildTimeInMillis));
        }
    }
    

    输出的一个例子是下列:

    Task 1 was not submitted. It will be rescheduled unless the max build time has expired
    Task 0 sleeping for 23 ms
    Task 1 sleeping for 26 ms
    Task 2 sleeping for 6 ms
    Task 3 sleeping for 9 ms
    Task 4 sleeping for 75 ms
    Task 5 sleeping for 35 ms
    Task 6 sleeping for 81 ms
    Task 8 was not submitted. It will be rescheduled unless the max build time has expired
    Task 8 was not submitted. It will be rescheduled unless the max build time has expired
    Task 7 sleeping for 86 ms
    Task 8 sleeping for 47 ms
    Task 9 sleeping for 40 ms
    Task 11 was not submitted. It will be rescheduled unless the max build time has expired
    Task 2 completed !
    Task 10 sleeping for 76 ms
    Task 12 was not submitted. It will be rescheduled unless the max build time has expired
    Task 3 completed !
    Task 11 sleeping for 31 ms
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 13 was not submitted. It will be rescheduled unless the max build time has expired
    Task 0 completed !
    Task 12 sleeping for 7 ms
    Task 14 was not submitted. It will be rescheduled unless the max build time has expired
    Task 14 was not submitted. It will be rescheduled unless the max build time has expired
    Task 1 completed !
    Task 13 sleeping for 40 ms
    Task 15 was not submitted. It will be rescheduled unless the max build time has expired
    Task 12 completed !
    Task 14 sleeping for 93 ms
    Task 16 was not submitted. It will be rescheduled unless the max build time has expired
    Task 16 was not submitted. It will be rescheduled unless the max build time has expired
    Task 16 was not submitted. It will be rescheduled unless the max build time has expired
    Task 5 completed !
    Task 15 sleeping for 20 ms
    Task 17 was not submitted. It will be rescheduled unless the max build time has expired
    Task 17 was not submitted. It will be rescheduled unless the max build time has expired
    Task 11 completed !
    Task 16 sleeping for 27 ms
    Task 18 was not submitted. It will be rescheduled unless the max build time has expired
    Task 18 was not submitted. It will be rescheduled unless the max build time has expired
    Task 9 completed !
    Task 17 sleeping for 95 ms
    Task 19 was not submitted. It will be rescheduled unless the max build time has expired
    Max build time has expired. Stop submitting new tasks and running existing tasks to completion
    Task 8 completed !
    Task 15 completed !
    Task 13 completed !
    Task 16 completed !
    Task 4 completed !
    Task 6 completed !
    Task 10 completed !
    Task 7 completed !
    Task 14 completed !
    Task 17 completed !
    

    例如,您会注意到任务19没有重新安排,因为在调度程序第二次尝试将其提供给队列之前,最大构建时间已到期 . 您还可以看到,在最长构建时间到期之前开始的所有正在进行的任务都要运行完成 .

    Note: 正如我在代码中的注释所述,最大构建时间是一个软需求,这意味着可能无法完全满足,并且我的解决方案确实允许即使在最长构建时间到期后也可以提交任务 . 如果对 offer 的调用在最大构建时间到期之前启动并且之后完成,则会发生这种情况 . 为了减少它发生的几率,重要的是调用 offer 时使用的超时远小于最大构建时间 . 在实际系统中,线程池通常忙于没有空闲线程,因此这种竞争条件发生的概率非常小,并且当它确实发生时它对系统没有不良后果,因为最大构建时间是尽最大努力尝试满足整体运行时间,而不是精确和严格的约束 .

相关问题