首页 文章

等到任何Future <T>完成

提问于
浏览
53

我有很少的异步任务在运行,我需要等到它们中的至少一个完成(将来我可能需要等待N个任务中的util M完成) . 目前他们被呈现为未来,所以我需要类似的东西

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

有这样的吗?或类似的东西,对于Future来说不是必需的 . 目前我循环收集期货,检查是否已完成,然后再睡一段时间并再次检查 . 这看起来不是最好的解决方案,因为如果我长时间睡眠会增加不必要的延迟,如果我短时间睡眠则会影响性能 .

我可以尝试使用

new CountDownLatch(1)

并在任务完成时减少倒计时并执行

countdown.await()

,但我发现只有控制未来创造才有可能 . 这是可能的,但需要系统重新设计,因为当前创建任务的逻辑(将Callable发送到ExecutorService)与决定等待哪个Future分开 . 我也可以覆盖

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

并创建RunnableFuture的自定义实现,能够附加侦听器以在任务完成时得到通知,然后将此类侦听器附加到所需任务并使用CountDownLatch,但这意味着我必须为我使用的每个ExecutorService重写newTaskFor - 并且可能会有实现它不会扩展AbstractExecutorService . 我也可以尝试包装ExecutorService用于同样的目的,但是我必须装饰所有 生产环境 Futures的方法 .

所有这些解决方案都可行,但看起来非常不自然 . 看起来我错过了一些简单的东西,比如

WaitHandle.WaitAny(WaitHandle[] waitHandles)

在c#中 . 是否存在针对此类问题的众所周知的解决方案?

更新:

最初我根本没有访问Future创建,所以没有优雅的解决方案 . 重新设计系统后,我可以访问Future创建,并能够将countDownLatch.countdown()添加到执行过程,然后我可以countDownLatch.await(),一切正常 . 感谢其他答案,我不知道ExecutorCompletionService,它确实可以在类似的任务中有所帮助,但在这种特殊情况下它无法使用,因为一些Futures是在没有任何 Actuator 的情况下创建的 - 实际任务是通过网络发送到另一台服务器的,远程完成并收到完成通知 .

7 回答

  • 6

    为什么不创建结果队列并等待队列?或者更简单地说,使用CompletionService,因为它就是这样:ExecutorService结果队列 .

  • 55

    据我所知,Java与 WaitHandle.WaitAny 方法没有类似的结构 .

    在我看来,这可以通过“WaitableFuture”装饰器来实现:

    public WaitableFuture<T>
        extends Future<T>
    {
        private CountDownLatch countDownLatch;
    
        WaitableFuture(CountDownLatch countDownLatch)
        {
            super();
    
            this.countDownLatch = countDownLatch;
        }
    
        void doTask()
        {
            super.doTask();
    
            this.countDownLatch.countDown();
        }
    }
    

    虽然这只有在可以在执行代码之前插入时才有效,否则执行代码将没有新的 doTask() 方法 . 但是如果你不能在执行之前以某种方式获得对Future对象的控制,我真的看不到没有轮询的方法 .

    或者如果未来总是在自己的线程中运行,那么你可以以某种方式获得该线程 . 然后你可以产生一个新的线程来加入彼此的线程,然后在连接返回后处理等待机制...这将是非常丑陋的,但会引起很多开销 . 如果某些Future对象没有完成,那么根据死线程可能会有很多被阻塞的线程 . 如果你不小心,这可能会泄漏内存和系统资源 .

    /**
     * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
     */
    public static joinAny(Collection<Thread> threads, int numberToWaitFor)
    {
        CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);
    
        foreach(Thread thread in threads)
        {
            (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
        }
    
        countDownLatch.await();
    }
    
    class JoinThreadHelper
        implements Runnable
    {
        Thread thread;
        CountDownLatch countDownLatch;
    
        JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
        {
            this.thread = thread;
            this.countDownLatch = countDownLatch;
        }
    
        void run()
        {
            this.thread.join();
            this.countDownLatch.countDown();
        }
    }
    
  • 9

    简单,看看ExecutorCompletionService .

  • 4
  • 0

    使用wait()和notifyAll()实际上非常简单 .

    首先,定义一个锁对象 . (你可以使用任何类,但我喜欢明确):

    package com.javadude.sample;
    
    public class Lock {}
    

    接下来,定义您的工作线程 . 他完成处理后必须通知锁定对象 . 请注意,通知必须位于锁定对象上的同步块锁定中 .

    package com.javadude.sample;
    
    public class Worker extends Thread {
        private Lock lock_;
        private long timeToSleep_;
        private String name_;
        public Worker(Lock lock, String name, long timeToSleep) {
            lock_ = lock;
            timeToSleep_ = timeToSleep;
            name_ = name;
        }
        @Override
        public void run() {
            // do real work -- using a sleep here to simulate work
            try {
                sleep(timeToSleep_);
            } catch (InterruptedException e) {
                interrupt();
            }
            System.out.println(name_ + " is done... notifying");
            // notify whoever is waiting, in this case, the client
            synchronized (lock_) {
                lock_.notify();
            }
        }
    }
    

    最后,你可以写你的客户:

    package com.javadude.sample;
    
    public class Client {
        public static void main(String[] args) {
            Lock lock = new Lock();
            Worker worker1 = new Worker(lock, "worker1", 15000);
            Worker worker2 = new Worker(lock, "worker2", 10000);
            Worker worker3 = new Worker(lock, "worker3", 5000);
            Worker worker4 = new Worker(lock, "worker4", 20000);
    
            boolean started = false;
            int numNotifies = 0;
            while (true) {
                synchronized (lock) {
                    try {
                        if (!started) {
                            // need to do the start here so we grab the lock, just
                            //   in case one of the threads is fast -- if we had done the
                            //   starts outside the synchronized block, a fast thread could
                            //   get to its notification *before* the client is waiting for it
                            worker1.start();
                            worker2.start();
                            worker3.start();
                            worker4.start();
                            started = true;
                        }
                        lock.wait();
                    } catch (InterruptedException e) {
                        break;
                    }
                    numNotifies++;
                    if (numNotifies == 4) {
                        break;
                    }
                    System.out.println("Notified!");
                }
            }
            System.out.println("Everyone has notified me... I'm done");
        }
    }
    
  • 7

    既然你不关心哪一个完成,为什么不只是为所有线程都有一个WaitHandle并等待呢?无论哪一个首先完成都可以设置手柄 .

  • -1

    看到这个选项:

    public class WaitForAnyRedux {
    
    private static final int POOL_SIZE = 10;
    
    public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {
    
        List<Callable<T>> callables = new ArrayList<Callable<T>>();
        for (final T t : collection) {
            Callable<T> callable = Executors.callable(new Thread() {
    
                @Override
                public void run() {
                    synchronized (t) {
                        try {
                            t.wait();
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }, t);
            callables.add(callable);
        }
    
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
        ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
        return executorService.invokeAny(callables);
    }
    
    static public void main(String[] args) throws InterruptedException, ExecutionException {
    
        final List<Integer> integers = new ArrayList<Integer>();
        for (int i = 0; i < POOL_SIZE; i++) {
            integers.add(i);
        }
    
        (new Thread() {
            public void run() {
                Integer notified = null;
                try {
                    notified = waitForAny(integers);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                System.out.println("notified=" + notified);
            }
    
        }).start();
    
    
        synchronized (integers) {
            integers.wait(3000);
        }
    
    
        Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
        System.out.println("Waking up " + randomInt);
        synchronized (randomInt) {
            randomInt.notify();
        }
      }
    }
    

相关问题