首页 文章

执行程序:如果递归创建任务,如何同步等待所有任务完成?

提问于
浏览
12

我的问题与this one here密切相关 . 正如在那里发布的那样,我希望主线程等到工作队列为空并且所有任务都已完成 . 然而,在我的情况下,问题是每个任务可以递归地导致提交新任务以进行处理 . 这使收集所有这些任务的未来变得有点尴尬 .

我们当前的解决方案使用忙等待循环来等待终止:

do { //Wait until we are done the processing
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    } while (!executor.getQueue().isEmpty()
             || numTasks.longValue() > executor.getCompletedTaskCount());

numTasks是一个在创建每个新任务时增加的值 . 这有效但我认为由于忙碌的等待而不是很好 . 我想知道是否有一种好方法可以使主线程同步等待,直到被明确唤醒 .

9 回答

  • 0

    非常感谢你的所有建议!

    最后,我选择了一些我认为相当简单的东西 . 我发现CountDownLatch几乎就是我所需要的 . 它会阻塞,直到计数器达到0.唯一的问题是它只能倒计时,而不是向上,因此在动态设置中不起作用,我可以在任务中提交新任务 . 因此,我实现了一个新类 CountLatch ,它提供了额外的功能 . (见下文)这个课我然后使用如下 .

    主线程调用 latch.awaitZero() ,阻塞直到锁存器达到0 .

    任何线程,在调用 executor.execute(..) 之前调用 latch.increment() .

    在完成之前的任何任务都会调用 latch.decrement() .

    当最后一个任务终止时,计数器将达到0,从而释放主线程 .

    欢迎提供进一步的建议和反馈!

    public class CountLatch {
    
    @SuppressWarnings("serial")
    private static final class Sync extends AbstractQueuedSynchronizer {
    
        Sync(int count) {
            setState(count);
        }
    
        int getCount() {
            return getState();
        }
    
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
    
        protected int acquireNonBlocking(int acquires) {
            // increment count
            for (;;) {
                int c = getState();
                int nextc = c + 1;
                if (compareAndSetState(c, nextc))
                    return 1;
            }
        }
    
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    
    private final Sync sync;
    
    public CountLatch(int count) {
        this.sync = new Sync(count);
    }
    
    public void awaitZero() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    public void increment() {
        sync.acquireNonBlocking(1);
    }
    
    public void decrement() {
        sync.releaseShared(1);
    }
    
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
    
    }
    

    请注意, increment() / decrement() 调用可以封装到自定义的 Executor 子类中,例如,由Sami Korhonen建议,或者如impl所建议的 beforeExecuteafterExecute . 看这里:

    public class CountingThreadPoolExecutor extends ThreadPoolExecutor {
    
    protected final CountLatch numRunningTasks = new CountLatch(0);
    
    public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    
    @Override
    public void execute(Runnable command) {
        numRunningTasks.increment();
        super.execute(command);
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        numRunningTasks.decrement();
        super.afterExecute(r, t);
    }
    
    /**
     * Awaits the completion of all spawned tasks.
     */
    public void awaitCompletion() throws InterruptedException {
        numRunningTasks.awaitZero();
    }
    
    /**
     * Awaits the completion of all spawned tasks.
     */
    public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
        numRunningTasks.awaitZero(timeout, unit);
    }
    
    }
    
  • 4

    Java 7提供了一个适合这个名为Phaser的用例的同步器 . 它是CountDownLatch和CyclicBarrier的可重用混合体,可以增加和减少注册方的数量(类似于可递增的CountDownLatch) .

    在此方案中使用移相器的基本模式是在创建时使用移相器执行register任务,并在完成时使用arrive . 当到达方的数量与注册的数量相匹配时,相位器将进入下一阶段,并在进行时通知任何预先通过的线程 .

    这是我创建的等待递归任务完成的示例 . 为了演示目的,它天真地找到Fibonacci序列的前几个数字:

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.Phaser;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * An example of using a Phaser to wait for the completion of recursive tasks.
     * @author Voxelot
     */
    public class PhaserExample {
        /** Workstealing threadpool with reduced queue contention. */
        private static ForkJoinPool executors;
    
        /**
         * @param args the command line arguments
         */
        public static void main(String[] args) throws InterruptedException {
            executors = new ForkJoinPool();
            List<Long> sequence = new ArrayList<>();
            for (int i = 0; i < 20; i++) {
                sequence.add(fib(i));
            }
            System.out.println(sequence);
        }
    
        /**
         * Computes the nth Fibonacci number in the Fibonacci sequence.
         * @param n The index of the Fibonacci number to compute
         * @return The computed Fibonacci number
         */
        private static Long fib(int n) throws InterruptedException {
            AtomicLong result = new AtomicLong();
            //Flexible sychronization barrier
            Phaser phaser = new Phaser();
            //Base task
            Task initialTask = new Task(n, result, phaser);
            //Register fib(n) calling thread
            phaser.register();
            //Submit base task
            executors.submit(initialTask);
            //Make the calling thread arrive at the synchronization
            //barrier and wait for all future tasks to arrive.
            phaser.arriveAndAwaitAdvance();
            //Get the result of the parallel computation.
            return result.get();
        }
    
        private static class Task implements Runnable {
            /** The Fibonacci sequence index of this task. */
            private final int index;
            /** The shared result of the computation. */
            private final AtomicLong result;
            /** The synchronizer. */
            private final Phaser phaser;
    
            public Task(int n, AtomicLong result, Phaser phaser) {
                index = n;
                this.result = result;
                this.phaser = phaser;
                //Inform synchronizer of additional work to complete.
                phaser.register();
            }
    
            @Override
            public void run() {
                if (index == 1) {
                    result.incrementAndGet();
                } else if (index > 1) {
                    //recurrence relation: Fn = Fn-1 + Fn-2
                    Task task1 = new Task(index - 1, result, phaser);
                    Task task2 = new Task(index - 2, result, phaser);
                    executors.submit(task1);
                    executors.submit(task2);
                }
                //Notify synchronizer of task completion.
                phaser.arrive();
            }
        }
    }
    
  • 0

    这个实际上是一个很有趣的问题需要解决 . 我必须警告我没有完全测试代码 .

    想法是简单地跟踪任务执行:

    • 如果任务成功排队,则计数器加1

    • 如果任务被取消且尚未执行,则计数器减1

    • 如果已执行任务,则计数器减1

    当调用shutdown并且有待处理的任务时,delegate不会在实际的ExecutorService上调用shutdown . 它将允许排队新任务,直到挂起的任务计数达到零,并在实际的ExecutorService上调用shutdown .

    public class ResilientExecutorServiceDelegate implements ExecutorService {
        private final ExecutorService executorService;
        private final AtomicInteger pendingTasks;
        private final Lock readLock;
        private final Lock writeLock;
        private boolean isShutdown;
    
        public ResilientExecutorServiceDelegate(ExecutorService executorService) {
            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
            this.pendingTasks = new AtomicInteger();
            this.readLock = readWriteLock.readLock();
            this.writeLock = readWriteLock.writeLock();
            this.executorService = executorService;
            this.isShutdown = false;
        }
    
        private <T> T addTask(Callable<T> task) {
            T result;
            boolean success = false;
            // Increment pending tasks counter
            incrementPendingTaskCount();
            try {
                // Call service
                result = task.call();
                success = true;
            } catch (RuntimeException exception) {
                throw exception;
            } catch (Exception exception) {
                throw new RejectedExecutionException(exception);
            } finally {
                if (!success) {
                    // Decrement pending tasks counter
                    decrementPendingTaskCount();
                }
            }
            return result;
        }
    
        private void incrementPendingTaskCount() {
            pendingTasks.incrementAndGet();
        }
    
        private void decrementPendingTaskCount() {
            readLock.lock();
            if (pendingTasks.decrementAndGet() == 0 && isShutdown) {
                try {
                    // Shutdown
                    executorService.shutdown();
                } catch (Throwable throwable) {
                }
            }
            readLock.unlock();
        }
    
        @Override
        public void execute(final Runnable task) {
            // Add task
            addTask(new Callable<Object>() {
                @Override
                public Object call() {
                    executorService.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                task.run();
                            } finally {
                                decrementPendingTaskCount();
                            }
                        }
                    });
                    return null;
                }
            });
        }
    
        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            // Call service
            return executorService.awaitTermination(timeout, unit);
        }
    
        @Override
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
            // It's ok to increment by just one
            incrementPendingTaskCount();
            try {
                return executorService.invokeAll(tasks);
            } finally {
                decrementPendingTaskCount();
            }
        }
    
        @Override
        public <T> List<Future<T>> invokeAll(
                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                throws InterruptedException {
            // It's ok to increment by just one
            incrementPendingTaskCount();
            try {
                return executorService.invokeAll(tasks, timeout, unit);
            } finally {
                decrementPendingTaskCount();
            }
        }
    
        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
                throws InterruptedException, ExecutionException {
            // It's ok to increment by just one
            incrementPendingTaskCount();
            try {
                return executorService.invokeAny(tasks);
            } finally {
                decrementPendingTaskCount();
            }
        }
    
        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            incrementPendingTaskCount();
            try {
                return executorService.invokeAny(tasks, timeout, unit);
            } finally {
                decrementPendingTaskCount();
            }
        }
    
        @Override
        public boolean isShutdown() {
            return isShutdown;
        }
    
        @Override
        public boolean isTerminated() {
            return executorService.isTerminated();
        }
    
        @Override
        public void shutdown() {
            // Lock write lock
            writeLock.lock();
            // Set as shutdown
            isShutdown = true;
            try {
                if (pendingTasks.get() == 0) {
                    // Real shutdown
                    executorService.shutdown();
                }
            } finally {
                // Unlock write lock
                writeLock.unlock();
            }
        }
    
        @Override
        public List<Runnable> shutdownNow() {
            // Lock write lock
            writeLock.lock();
            // Set as shutdown
            isShutdown = true;
            // Unlock write lock
            writeLock.unlock();
    
            return executorService.shutdownNow();
        }
    
        @Override
        public <T> Future<T> submit(final Callable<T> task) {
            // Create execution status
            final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
            // Add task
            return addTask(new Callable<Future<T>>() {
                @Override
                public Future<T> call() {
                    return new FutureDelegate<T>(
                            executorService.submit(new Callable<T>() {
                                @Override
                                public T call() throws Exception {
                                    try {
                                        // Mark as executed
                                        futureExecutionStatus.setExecuted();
                                        // Run the actual task
                                        return task.call();
                                    } finally {
                                        decrementPendingTaskCount();
                                    }
                                }
                            }), futureExecutionStatus);
                }
            });
        }
    
        @Override
        public Future<?> submit(final Runnable task) {
            // Create execution status
            final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
            // Add task
            return addTask(new Callable<Future<?>>() {
                @Override
                @SuppressWarnings("unchecked")
                public Future<?> call() {
                    return new FutureDelegate<Object>(
                            (Future<Object>) executorService.submit(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        // Mark as executed
                                        futureExecutionStatus.setExecuted();
                                        // Run the actual task
                                        task.run();
                                    } finally {
                                        decrementPendingTaskCount();
                                    }
                                }
                            }), futureExecutionStatus);
                }
            });
        }
    
        @Override
        public <T> Future<T> submit(final Runnable task, final T result) {
            // Create execution status
            final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
            // Add task
            return addTask(new Callable<Future<T>>() {
                @Override
                public Future<T> call() {
                    return new FutureDelegate<T>(executorService.submit(
                            new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        // Mark as executed
                                        futureExecutionStatus.setExecuted();
                                        // Run the actual task
                                        task.run();
                                    } finally {
                                        decrementPendingTaskCount();
                                    }
                                }
                            }, result), futureExecutionStatus);
                }
            });
        }
    
        private class FutureExecutionStatus {
            private volatile boolean executed;
    
            public FutureExecutionStatus() {
                executed = false;
            }
    
            public void setExecuted() {
                executed = true;
            }
    
            public boolean isExecuted() {
                return executed;
            }
        }
    
        private class FutureDelegate<T> implements Future<T> {
            private Future<T> future;
            private FutureExecutionStatus executionStatus;
    
            public FutureDelegate(Future<T> future,
                    FutureExecutionStatus executionStatus) {
                this.future = future;
                this.executionStatus = executionStatus;
            }
    
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                boolean cancelled = future.cancel(mayInterruptIfRunning);
                if (cancelled) {
                    // Lock read lock
                    readLock.lock();
                    // If task was not executed
                    if (!executionStatus.isExecuted()) {
                        decrementPendingTaskCount();
                    }
                    // Unlock read lock
                    readLock.unlock();
                }
                return cancelled;
            }
    
            @Override
            public T get() throws InterruptedException, ExecutionException {
                return future.get();
            }
    
            @Override
            public T get(long timeout, TimeUnit unit) throws InterruptedException,
                    ExecutionException, TimeoutException {
                return future.get(timeout, unit);
            }
    
            @Override
            public boolean isCancelled() {
                return future.isCancelled();
            }
    
            @Override
            public boolean isDone() {
                return future.isDone();
            }
        }
    }
    
  • 0

    你为什么不用柜台?例如:

    private AtomicInteger counter = new AtomicInteger(0);
    

    在将任务提交到队列之前将计数器递增1:

    counter.incrementAndGet();
    

    并在任务结束时将其减1:

    counter.decrementAndGet();
    

    检查将是这样的:

    // ...
    while (counter.get() > 0);
    
  • 2

    Java 7通过其ForkJoinPool executor结合了对递归任务的支持 . 它是quite simple to use并且可以很好地扩展,只要任务本身不是太微不足道 . 本质上,它提供了一个受控接口,允许任务等待任何子任务的完成,而不会无限期地阻塞底层线程 .

  • 6

    您链接到的答案中建议的选项之一是使用CompletionService

    您可以使用以下命令替换主线程中的忙等待:

    while (true) {
        Future<?> f = completionService.take(); //blocks until task completes
        if (executor.getQueue().isEmpty()
             && numTasks.longValue() == executor.getCompletedTaskCount()) break;
    }
    

    请注意, getCompletedTaskCount 仅返回一个近似数字,因此您可能需要找到更好的退出条件 .

  • 5

    如果您知道要等待的线程数,可以使用CountDownLatch(http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html)粘贴一行代码来增加每个线程的数量它可以解决您的问题

  • 0

    由于上一个任务不知道它是最后一个,我实际上认为没有记录就可以100%正确地完成这项工作在任务启动和完成任务时 .

    如果内存对我有用, getQueue() 方法将返回一个队列,该队列仅包含仍在等待执行的任务,而不是当前正在运行的任务 . 此外, getCompletedTaskCount() 是近似值 .

    我回答的解决方案和Condition用于发出唤醒主线程的信号(为简单起见,请原谅快捷方式):

    public class MyThreadPoolExecutorState {
    
        public final Lock lock = new ReentrantLock();
        public final Condition workDone = lock.newCondition();
        public boolean workIsDone = false;
    
    }
    
    public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    
        private final MyThreadPoolExecutorState state;
        private final AtomicInteger counter = new AtomicInteger(0);
    
        public MyThreadPoolExecutor(MyThreadPoolExecutorState state, ...) {
            super(...);
            this.state = state;
        }
    
        protected void beforeExecute(Thread t, Runnable r) {
            this.counter.incrementAndGet();
        }
    
        protected void afterExecute(Runnable r, Throwable t) {
            if(this.counter.decrementAndGet() == 0) {
                this.state.lock.lock();
                try {
                    this.state.workIsDone = true;
                    this.state.workDone.signal();
                }
                finally {
                    this.state.lock.unlock();
                }
            }
        }
    
    }
    
    public class MyApp {
    
        public static void main(...) {
    
            MyThreadPoolExecutorState state = new MyThreadPoolExecutorState();
            MyThreadPoolExecutor executor = new MyThreadPoolExecutor(state, ...);
    
            // Fire ze missiles!
            executor.submit(...);
    
            state.lock.lock();
            try {
                while(state.workIsDone == false) {
                    state.workDone.await();
                }
            }
            finally {
                state.lock.unlock();
            }
    
        }
    
    }
    

    它可能会更优雅(可能只是在你的线程池 Actuator 中提供一个 getState() 或者什么?),但我认为它应该完成工作 . 这也是未经测试的,所以要自己承担责任......

    值得注意的是,如果没有任务要执行,这个解决方案肯定会失败 - 它将无限期地等待信号 . 因此,如果您没有要运行的任务,甚至不必费心启动执行程序 .


    Edit: 第二个想法,增加原子计数器应该在提交时发生,而不是在任务执行之前立即发生(因为排队可能导致计数器过早地降到0) . 替代 submit(...) 方法可能是有意义的,也可能是 remove(...)shutdown() (如果你使用它们) . 但总体思路仍然相同 . (但我想的越多,它就越不漂亮 . )

    我还要查看 class 的内部,看看你是否可以从中收集任何知识:http://hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java . tryTerminate() 方法看起来很有趣 .

  • 0

    您可以使用原子计数器来计算提交(就像在实际提交之前所说的那样) . 将其与信号量结合并将其释放到 afterExecute 提供的 afterExecute 钩子中 . 在提交第一轮工作后,请致电 semaphore.acquire( counter.get()) 而不是忙碌等待 . 但是,当调用获取时,获取的数量将太小,因为计数器可能会在以后增加 . 您必须循环获取调用,并将自上次调用后的增加作为参数,直到计数器不再增加 .

相关问题