首页 文章

处理Java ExecutorService任务中的异常

提问于
浏览
182

我'm trying to use Java' s ThreadPoolExecutor 类用固定数量的线程运行大量重量级任务 . 每个任务都有许多地方,在这些地方可能因异常而失败 .

我已经子类化 ThreadPoolExecutor 并且我已经覆盖了 afterExecute 方法,该方法应该提供运行任务时遇到的任何未捕获的异常 . 但是,我似乎无法使其发挥作用 .

例如:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

这个程序的输出是“一切都很好 - 情况正常!”即使提交给线程池的唯一Runnable也会引发异常 . 有什么线索在这里发生了什么?

谢谢!

11 回答

  • 218

    我正在使用jcabi-log中的VerboseRunnable类,它吞下所有异常并记录它们 . 非常方便,例如:

    import com.jcabi.log.VerboseRunnable;
    scheduler.scheduleWithFixedDelay(
      new VerboseRunnable(
        Runnable() {
          public void run() { 
            // the code, which may throw
          }
        },
        true // it means that all exceptions will be swallowed and logged
      ),
      1, 1, TimeUnit.MILLISECONDS
    );
    
  • 1

    来自docs

    注意:当任务明确地或通过诸如submit之类的方法包含在任务(例如FutureTask)中时,这些任务对象会捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递给此方法 .

    当你提交一个Runnable时,它将被包含在Future中 .

    你的afterExecute应该是这样的:

    public final class ExtendedExecutor extends ThreadPoolExecutor {
    
        // ...
    
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t == null && r instanceof Future<?>) {
                try {
                    Future<?> future = (Future<?>) r;
                    if (future.isDone()) {
                        future.get();
                    }
                } catch (CancellationException ce) {
                    t = ce;
                } catch (ExecutionException ee) {
                    t = ee.getCause();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            if (t != null) {
                System.out.println(t);
            }
        }
    }
    
  • 0

    WARNING :应该注意,此解决方案将阻止调用线程 .


    如果要处理任务抛出的异常,则通常最好使用 Callable 而不是 Runnable .

    允许 Callable.call() 抛出已检查的异常,并将这些异常传播回调用线程:

    Callable task = ...
    Future future = executor.submit(task);
    try {
       future.get();
    } catch (ExecutionException ex) {
       ex.getCause().printStackTrace();
    }
    

    如果 Callable.call() 抛出异常,则会将其包装在 ExecutionException 中并由 Future.get() 抛出 .

    这可能比继承 ThreadPoolExecutor 更为可取 . 如果异常是可恢复的,它还为您提供重新提交任务的机会 .

  • 6

    这种行为的解释正好在_1692168中:

    注意:当任务明确地或通过诸如submit之类的方法包含在任务(例如FutureTask)中时,这些任务对象会捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递给此方法 .

  • -5

    我通过将提交的runnable包装提交给执行程序来解决它 .

    CompletableFuture.runAsync(
    
            () -> {
                    try {
                            runnable.run();
                    } catch (Throwable e) {
                            Log.info(Concurrency.class, "runAsync", e);
                    }
            },
    
            executorService
    );
    
  • 3

    另一种解决方案是使用ManagedTask和ManagedTaskListener .

    您需要一个Callable或Runnable来实现ManagedTask接口 .

    方法 getManagedTaskListener 返回所需的实例 .

    public ManagedTaskListener getManagedTaskListener() {
    

    并在ManagedTaskListener中实现 taskDone 方法:

    @Override
    public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
        if (exception != null) {
            LOGGER.log(Level.SEVERE, exception.getMessage());
        }
    }
    

    有关managed task lifecycle and listener的更多详细信息 .

  • 16

    如果要监视任务的执行,可以旋转1或2个线程(可能更多,具体取决于负载)并使用它们从ExecutionCompletionService包装器中获取任务 .

  • 0

    如果您的 ExecutorService 来自外部源(即,无法继承 ThreadPoolExecutor 并覆盖 afterExecute() ),则可以使用动态代理来实现所需的行为:

    public static ExecutorService errorAware(final ExecutorService executor) {
        return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[] {ExecutorService.class},
                (proxy, method, args) -> {
                    if (method.getName().equals("submit")) {
                        final Object arg0 = args[0];
                        if (arg0 instanceof Runnable) {
                            args[0] = new Runnable() {
                                @Override
                                public void run() {
                                    final Runnable task = (Runnable) arg0;
                                    try {
                                        task.run();
                                        if (task instanceof Future<?>) {
                                            final Future<?> future = (Future<?>) task;
    
                                            if (future.isDone()) {
                                                try {
                                                    future.get();
                                                } catch (final CancellationException ce) {
                                                    // Your error-handling code here
                                                    ce.printStackTrace();
                                                } catch (final ExecutionException ee) {
                                                    // Your error-handling code here
                                                    ee.getCause().printStackTrace();
                                                } catch (final InterruptedException ie) {
                                                    Thread.currentThread().interrupt();
                                                }
                                            }
                                        }
                                    } catch (final RuntimeException re) {
                                        // Your error-handling code here
                                        re.printStackTrace();
                                        throw re;
                                    } catch (final Error e) {
                                        // Your error-handling code here
                                        e.printStackTrace();
                                        throw e;
                                    }
                                }
                            };
                        } else if (arg0 instanceof Callable<?>) {
                            args[0] = new Callable<Object>() {
                                @Override
                                public Object call() throws Exception {
                                    final Callable<?> task = (Callable<?>) arg0;
                                    try {
                                        return task.call();
                                    } catch (final Exception e) {
                                        // Your error-handling code here
                                        e.printStackTrace();
                                        throw e;
                                    } catch (final Error e) {
                                        // Your error-handling code here
                                        e.printStackTrace();
                                        throw e;
                                    }
                                }
                            };
                        }
                    }
                    return method.invoke(executor, args);
                });
    }
    
  • 0

    这是因为 AbstractExecutorService :: submitrunnable 包装成 RunnableFuture (只有 FutureTask ),如下所示

    AbstractExecutorService.java
    
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
        execute(ftask);
        return ftask;
    }
    

    然后 execute 会将其传递给 WorkerWorker.run() 会调用以下内容 .

    ThreadPoolExecutor.java
    
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();           /////////HERE////////
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    最后task.run();在上面的代码调用中将调用FutureTask.run() . 这是异常处理程序代码,因此您没有得到预期的异常 .

    class FutureTask<V> implements RunnableFuture<V>
    
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {   /////////HERE////////
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
  • 9

    这有效

    • 它派生自SingleThreadExecutor,但您可以轻松地进行调整

    • Java 8 lamdas代码,但易于修复

    它将创建一个具有单个线程的Executor,可以完成很多任务;并将等待当前的一个结束执行以从下一个开始

    如果出现unaugth错误或异常,则uncaughtExceptionHandler将捕获它

    public final class SingleThreadExecutorWithExceptions {
    
        public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
    
            ThreadFactory factory = (Runnable runnable)  -> {
                final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions");
                newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> {
                    uncaughtExceptionHandler.uncaughtException(caugthThread, throwable);
                });
                return newThread;
            };
            return new FinalizableDelegatedExecutorService
                    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue(),
                            factory){
    
    
                        protected void afterExecute(Runnable runnable, Throwable throwable) {
                            super.afterExecute(runnable, throwable);
                            if (throwable == null && runnable instanceof Future) {
                                try {
                                    Future future = (Future) runnable;
                                    if (future.isDone()) {
                                        future.get();
                                    }
                                } catch (CancellationException ce) {
                                    throwable = ce;
                                } catch (ExecutionException ee) {
                                    throwable = ee.getCause();
                                } catch (InterruptedException ie) {
                                    Thread.currentThread().interrupt(); // ignore/reset
                                }
                            }
                            if (throwable != null) {
                                uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable);
                            }
                        }
                    });
        }
    
    
    
        private static class FinalizableDelegatedExecutorService
                extends DelegatedExecutorService {
            FinalizableDelegatedExecutorService(ExecutorService executor) {
                super(executor);
            }
            protected void finalize() {
                super.shutdown();
            }
        }
    
        /**
         * A wrapper class that exposes only the ExecutorService methods
         * of an ExecutorService implementation.
         */
        private static class DelegatedExecutorService extends AbstractExecutorService {
            private final ExecutorService e;
            DelegatedExecutorService(ExecutorService executor) { e = executor; }
            public void execute(Runnable command) { e.execute(command); }
            public void shutdown() { e.shutdown(); }
            public List shutdownNow() { return e.shutdownNow(); }
            public boolean isShutdown() { return e.isShutdown(); }
            public boolean isTerminated() { return e.isTerminated(); }
            public boolean awaitTermination(long timeout, TimeUnit unit)
                    throws InterruptedException {
                return e.awaitTermination(timeout, unit);
            }
            public Future submit(Runnable task) {
                return e.submit(task);
            }
            public  Future submit(Callable task) {
                return e.submit(task);
            }
            public  Future submit(Runnable task, T result) {
                return e.submit(task, result);
            }
            public  List> invokeAll(Collection> tasks)
                    throws InterruptedException {
                return e.invokeAll(tasks);
            }
            public  List> invokeAll(Collection> tasks,
                                                 long timeout, TimeUnit unit)
                    throws InterruptedException {
                return e.invokeAll(tasks, timeout, unit);
            }
            public  T invokeAny(Collection> tasks)
                    throws InterruptedException, ExecutionException {
                return e.invokeAny(tasks);
            }
            public  T invokeAny(Collection> tasks,
                                   long timeout, TimeUnit unit)
                    throws InterruptedException, ExecutionException, TimeoutException {
                return e.invokeAny(tasks, timeout, unit);
            }
        }
    
    
    
        private SingleThreadExecutorWithExceptions() {}
    }
    
  • 134

    而不是继承ThreadPoolExecutor,我会提供一个创建新线程的ThreadFactory实例并为它们提供UncaughtExceptionHandler

相关问题