首页 文章

如何使用此自定义ExecutorService正常关闭?

提问于
浏览
0

我是我的代码我将一些任务提交给ExecutorService,然后等待它们使用shutdown()和awaitTermination()完成 . 但是,如果任何一个任务花费的时间超过一定时间,我希望它在不影响其他任务的情况下取消 . 我使用ExecutorService that interrupts tasks after a timeout中的代码修改代码如下:

package com.jthink.jaikoz.memory;

import com.jthink.jaikoz.MainWindow;

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private boolean isShutdown = false;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        isShutdown = true;
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

        if (isShutdown)
        {
            if(getQueue().isEmpty())
            {
                //Queue is empty so all tasks either finished or currently running
                MainWindow.logger.severe("---Thread Pool Queue is Empty");
                //timeoutExecutor.shutdownNow();
            }
        }
    }

    /**
     * Interrupt the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            MainWindow.logger.severe("Cancelling task because taking too long");
            thread.interrupt();
        }
    }
}

以及任务有时间完成以及何时不能同时工作的测试用例

package com.jthink.jaikoz;

import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by Paul on 08/12/2014.
 */
public class TestThreadPool extends TestCase
{
    public void testThreadPoolTasksComplete() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }

    public void testThreadPoolTasksCancelled() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }
}

并在我的代码中似乎工作:

private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
            throws JaikozException
    {
        if (stopTask)
        {
            MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
            return false;
        }

        TimeoutThreadPoolExecutor es = getExecutorService();
        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
        for(MatchKey matchKey:matchKeyToSongs.keySet())
        {
            List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
            futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
        }
        es.shutdown();
        try
        {
            es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
        }
        catch(InterruptedException ie)
        {
            MainWindow.logger.warning(this.getClass() + " has been interrupted");
            return false;
        }
        return true;
    }

但对于一个客户来说,尽管如此

---Thread Pool Queue is Empty

输出awaitTermination()不返回,仅在用户两小时后取消任务时才返回 - 完全日志提取在这里

14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted

那么即使日志显示队列为空,awaiterTermination()也不会返回,因此在Executor本身和嵌入式timeoutExecutor上都调用了shutdown()?

我自己也有过一些想法,但不知道答案 .

  • 首先,为什么实际上必须关闭TimeOutExecutor以使awaitTermination()无论如何返回 . 在我的子类中,awaitTermination()没有被覆盖,所以如果所有任务都已完成,那么如果TiumeOutExecutor(awaitTermination()一无所知就是关闭是否重要)

  • 其次为什么---线程池队列为空有时会多次输出

1 回答

  • 1

    我在 TimeoutThreadPoolExecutor 进行了自定义修改,它运行正常 .

    public static class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
    {
        private final long timeout;
        private final TimeUnit timeoutUnit;
        private boolean isShutdown = false;
    
        private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
        private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
    
        public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
            this.timeout = timeout;
            this.timeoutUnit = timeoutUnit;
        }
    
        public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
            this.timeout = timeout;
            this.timeoutUnit = timeoutUnit;
        }
    
        public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
            this.timeout = timeout;
            this.timeoutUnit = timeoutUnit;
        }
    
        public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
            this.timeout = timeout;
            this.timeoutUnit = timeoutUnit;
        }
    
        @Override
        public void shutdown() {
            isShutdown = true;
            super.shutdown();
        }
    
        @Override
        public List<Runnable> shutdownNow() {
            timeoutExecutor.shutdownNow();
            return super.shutdownNow();
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            if(timeout > 0) {
                final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
                runningTasks.put(r, scheduled);
            }
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            ScheduledFuture timeoutTask = runningTasks.remove(r);
            if(timeoutTask != null) {
                timeoutTask.cancel(false);
            }
            if (isShutdown) timeoutExecutor.shutdown();
        }
    
        class TimeoutTask implements Runnable {
            private final Thread thread;
    
            public TimeoutTask(Thread thread) {
                this.thread = thread;
            }
    
            @Override
            public void run() {
                thread.interrupt();
                System.out.println("Cancelled");
            }
        }
    }
    

    Case 1 : No timeout

    final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
        100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
        6, TimeUnit.SECONDS);
    executorService.submit(new Callable<Object>()
    {
        @Override
        public Object call() throws Exception
        {
            Thread.sleep(5000);
            System.out.println("Done");
            return null;
        }
    
    });
    
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.DAYS);
    System.out.println("Program done");
    

    它打印:

    Task done
    Program done
    

    Case 2 : Timeout

    final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
        100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
        3, TimeUnit.SECONDS);
    executorService.submit(new Callable<Object>()
    {
        @Override
        public Object call() throws Exception
        {
            Thread.sleep(5000);
            System.out.println("Task done");
            return null;
        }
    
    });
    
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.DAYS);
    System.out.println("Program done");
    

    它打印:

    Cancelled
    Program done
    

相关问题