首页 文章

当ExecutorService上的所有任务完成或取消时,为什么awaitTermination无法可靠地返回

提问于
浏览
2

我是我的代码我将一些任务提交给ExecutorService,然后等待它们使用shutdown()和awaitTermination()完成 . 但是,如果任何一个任务花费的时间超过一定时间,我希望它在不影响其他任务的情况下取消 . 我使用ExecutorService that interrupts tasks after a timeout中的代码修改代码如下:

package com.jthink.jaikoz.memory;

import com.jthink.jaikoz.MainWindow;

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private boolean isShutdown = false;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        isShutdown = true;
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

        if (isShutdown)
        {
            if(getQueue().isEmpty())
            {
                //Queue is empty so all tasks either finished or currently running
                MainWindow.logger.severe("---Thread Pool Queue is Empty");
                timeoutExecutor.shutdown();
            }
        }
    }

    /**
     * Interrupt the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            MainWindow.logger.severe("Cancelling task because taking too long");
            thread.interrupt();
        }
    }
}

以及当任务有时间完成时以及当它们都不按预期工作时的测试用例

package com.jthink.jaikoz;

import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by Paul on 08/12/2014.
 */
public class TestThreadPool extends TestCase
{
    public void testThreadPoolTasksComplete() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }

    public void testThreadPoolTasksCancelled() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }
}

并在我的代码中似乎工作:

private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
            throws JaikozException
    {
        if (stopTask)
        {
            MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
            return false;
        }

        TimeoutThreadPoolExecutor es = getExecutorService();
        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
        for(MatchKey matchKey:matchKeyToSongs.keySet())
        {
            List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
            futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
        }
        es.shutdown();
        try
        {
            es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
        }
        catch(InterruptedException ie)
        {
            MainWindow.logger.warning(this.getClass() + " has been interrupted");
            return false;
        }
        return true;
    }

但对于一个客户来说,尽管如此

---Thread Pool Queue is Empty

输出awaitTermination()不返回,仅在用户两小时后取消任务时才返回 - 完全日志提取在这里

14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted

那么,即使日志显示队列为空,awaiTermination()也不会返回,因此在Executor本身和嵌入式timeoutExecutor上都调用了shutdown()?

我自己也有过一些想法,但不知道答案 .

  • 首先为什么实际上有必要关闭TimeOutExecutor以使awaitTermination()无论如何返回 . 在我的子类中,awaitTermination()没有被覆盖,所以如果所有任务都已完成,那么如果TiumeOutExecutor(awaitTermination()一无所知就是关闭是否重要)

  • 其次为什么---线程池队列为空有时会多次输出

  • TimeOutExecutor是单线程的,这是正确/必要的吗?

Update based on Holgers answer

所以你遇到的问题是你太早关闭了timeoutExecutor方式,因此它可能会错过一个或多个任务来中断线程池执行程序的挂起任务 .

我现在看到一个空队列意味着所有任务都已完成或已启动 . (对不起,我的示例测试误导以前它运行的是10个以上的临时编辑任务,而在 生产环境 代码中,没有工作人员是基于用户机器上的cpus数量) .

所以你说我过早地关闭()timeoutExecutor(可能还有最多的WorkerSize -1任务仍然在运行),这意味着仍在为尚未完成的任务运行的所有timeoutExecutors都被中断 . 因此,如果其中任何一个由于某种原因未能完全自行完成,则它们的超时任务不再存在,因此不能用于中断它们 . 但awaitTermination()woiuldnt返回的唯一原因是,如果其中一个(WorkerSize -1)任务没有完成 .

我自己已经在Beforeeecute()之前改变了

protected void afterExecute(Runnable r, Throwable t) {
    ScheduledFuture timeoutTask = runningTasks.remove(r);
    if(timeoutTask != null) {
        timeoutTask.cancel(false);
    }
    if (isShutdown)
    {
        if(getQueue().isEmpty())
        {

            if(runningTasks.size()==0)
            {
                this.shutdownNow();
            }
        }
    }
}

确保它完成我使用shutdownNow()但直到一切都完成,但根据你的评论,这可能仍然可能无法正常工作

我应该这样做

protected void afterExecute(Runnable r, Throwable t) {
    ScheduledFuture timeoutTask = runningTasks.remove(r);
    if(timeoutTask != null) {
        timeoutTask.cancel(false);
    }
}

protected void terminated() 
{
    timeoutExecutor.shutdown();
}

并且在提交的所有任务完成后立即调用terminate()(自然地或通过相应的timeoutExecutor取消),此时timeoutExecutor仍然存在并不重要?

对于completnesss修改我的测试用例,以便任务需要很长时间,除非超时任务正常工作显示原始解决方案失败(挂起)和修改后的解决方案工作

public void testThreadPoolTasksCancelled() throws Exception
    {
        Instant t1, t2;
        t1 = Instant.now();
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 50; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(500000000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        t2 = Instant.now();
        System.out.println("Program done:"+(Duration.between(t1, t2).toMillis()/ 1000+ " seconds"));
    }

1 回答

  • 1

    队列仅包含尚未启动的作业 . 拥有一个空队列并不意味着没有待处理的工作;它们可能刚刚被移除以便被执行 . 特别是在您的示例代码中,假设空队列意味着没有正在运行的作业是致命的错误;由于您将执行程序配置为具有十个核心线程并提交十个作业,因此在示例代码的整个执行过程中,队列将始终为空 .

    所以你遇到的问题是你过早地关闭 timeoutExecutor 方式,因此它可能会错过一个或多个任务来中断线程池执行程序的挂起任务 .

    请注意,原则上,作业可能甚至处于从队列中删除的状态(如果已添加),但尚未调用 beforeExecute . 因此,即使有一个空队列和一个空的 runningTasks 映射也不能保证没有待处理的作业 .


    要回答你的另一个问题,你必须关闭 timeoutExecutor ,因为它有一个相关的活动线程,它将始终保持 Actuator 的活动状态 . 因此,不关闭它将导致内存泄漏并进一步保持线程活动,因此始终阻止自动JVM关闭 .

    但正确的地方做关闭 timeoutExecutor 是方法 protected void terminated() 的覆盖,它完全用于清理 .


    对于最后一个项目,无论你有多少线程,但考虑到任务的简单性,拥有多个线程并且单线程 Actuator 是最简单且可能是最有效的解决方案没有任何好处 .

相关问题