首页 文章

Java执行程序:如何在任务完成时通知而不阻塞?

提问于
浏览
121

假设我有一个完整的任务队列,我需要提交给执行者服务 . 我希望他们一次处理一个 . 我能想到的最简单的方法是:

  • 从队列中获取任务

  • 将其提交给遗嘱执行人

  • 在返回的Future上调用.get并阻塞,直到结果可用

  • 从队列中取出另一个任务......

但是,我试图完全避免阻塞 . 如果我有10,000个这样的队列,需要一次处理一个任务,我将耗尽堆栈空间,因为它们中的大多数将保持被阻塞的线程 .

我想要的是提交一个任务并提供一个在任务完成时调用的回调 . 我将使用该回叫通知作为发送下一个任务的标志 . (functionaljava和jetlang显然使用了这种非阻塞算法,但我无法理解他们的代码)

如何使用JDK的java.util.concurrent,而不是编写自己的 Actuator 服务?

(向我提供这些任务的队列本身可能会阻塞,但这是一个需要解决的问题)

11 回答

  • 4

    只是为了补充Matt的答案,这有助于,这是一个更加充实的例子来展示回调的使用 .

    private static Primes primes = new Primes();
    
    public static void main(String[] args) throws InterruptedException {
        getPrimeAsync((p) ->
            System.out.println("onPrimeListener; p=" + p));
    
        System.out.println("Adios mi amigito");
    }
    public interface OnPrimeListener {
        void onPrime(int prime);
    }
    public static void getPrimeAsync(OnPrimeListener listener) {
        CompletableFuture.supplyAsync(primes::getNextPrime)
            .thenApply((prime) -> {
                System.out.println("getPrimeAsync(); prime=" + prime);
                if (listener != null) {
                    listener.onPrime(prime);
                }
                return prime;
            });
    }
    

    输出是:

    getPrimeAsync(); prime=241
        onPrimeListener; p=241
        Adios mi amigito
    
  • 1

    定义回调接口以接收要在完成通知中传递的任何参数 . 然后在任务结束时调用它 .

    您甚至可以为Runnable任务编写一般包装器,并将它们提交给 ExecutorService . 或者,请参阅下面的Java 8内置机制 .

    class CallbackTask implements Runnable {
    
      private final Runnable task;
    
      private final Callback callback;
    
      CallbackTask(Runnable task, Callback callback) {
        this.task = task;
        this.callback = callback;
      }
    
      public void run() {
        task.run();
        callback.complete();
      }
    
    }
    

    通过CompletableFuture,Java 8包含了一种更复杂的方法来组成流程,其中流程可以异步和有条件地完成 . 这是一个人为但完整的通知示例 .

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    public class GetTaskNotificationWithoutBlocking {
    
      public static void main(String... argv) throws Exception {
        ExampleService svc = new ExampleService();
        GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
        CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
        f.thenAccept(listener::notify);
        System.out.println("Exiting main()");
      }
    
      void notify(String msg) {
        System.out.println("Received message: " + msg);
      }
    
    }
    
    class ExampleService {
    
      String work() {
        sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
        char[] str = new char[5];
        ThreadLocalRandom current = ThreadLocalRandom.current();
        for (int idx = 0; idx < str.length; ++idx)
          str[idx] = (char) ('A' + current.nextInt(26));
        String msg = new String(str);
        System.out.println("Generated message: " + msg);
        return msg;
      }
    
      public static void sleep(long average, TimeUnit unit) {
        String name = Thread.currentThread().getName();
        long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
        System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
        try {
          unit.sleep(timeout);
          System.out.println(name + " awoke.");
        } catch (InterruptedException abort) {
          Thread.currentThread().interrupt();
          System.out.println(name + " interrupted.");
        }
      }
    
      public static long exponential(long avg) {
        return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
      }
    
    }
    
  • 14

    使用Guava's listenable future API并添加回调 . 参看来自网站:

    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
    ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
      public Explosion call() {
        return pushBigRedButton();
      }
    });
    Futures.addCallback(explosion, new FutureCallback<Explosion>() {
      // we want this handler to run immediately after we push the big red button!
      public void onSuccess(Explosion explosion) {
        walkAwayFrom(explosion);
      }
      public void onFailure(Throwable thrown) {
        battleArchNemesis(); // escaped the explosion!
      }
    });
    
  • 46

    在Java 8中,您可以使用CompletableFuture . 这里's an example I had in my code where I'm使用它来从我的用户服务中获取用户,将它们映射到我的视图对象,然后更新我的视图或显示错误对话框(这是一个GUI应用程序):

    CompletableFuture.supplyAsync(
                userService::listUsers
        ).thenApply(
                this::mapUsersToUserViews
        ).thenAccept(
                this::updateView
        ).exceptionally(
                throwable -> { showErrorDialogFor(throwable); return null; }
        );
    

    它以异步方式执行 . 我使用两种私有方法: mapUsersToUserViewsupdateView .

  • 1

    您可以扩展 FutureTask 类,并覆盖 done() 方法,然后将 FutureTask 对象添加到 ExecutorService ,这样_908709_方法将在 FutureTask 立即完成时调用 .

  • 24

    ThreadPoolExecutor 也有 beforeExecuteafterExecute 钩子方法,您可以覆盖和使用它们 . 以下是 ThreadPoolExecutorJavadocs中的描述 .

    Hook方法此类提供在每个任务执行之前和之后调用的受保护的可覆盖的beforeExecute(java.lang.Thread,java.lang.Runnable)和afterExecute(java.lang.Runnable,java.lang.Throwable)方法 . 这些可以用来操纵执行环境;例如,重新初始化ThreadLocals,收集统计信息或添加日志条目 . 此外,可以重写方法terminate()以执行Executor完全终止后需要执行的任何特殊处理 . 如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止 .

  • 6

    使用CountDownLatch .

    它来自 java.util.concurrent ,它正是在继续之前等待多个线程完成执行的方式 .

    为了实现您正在寻找的回调效果,这需要一些额外的额外工作 . 也就是说,在一个使用 CountDownLatch 的单独线程中自己处理它并等待它,然后继续通知你需要通知什么 . 回调没有本机支持,或类似于该效果的任何东西 .


    EDIT: 现在我进一步理解了你的问题,我认为你走得太远了,不必要的 . 如果你定期SingleThreadExecutor,给它所有的任务,它将本机排队 .

  • 0

    如果要确保不会同时运行任何任务,请使用SingleThreadedExecutor . 任务将按提交的顺序处理 . 您甚至不需要执行任务,只需将它们提交给执行官即可 .

  • 124

    使用 ExecutorService 实现 Callback 机制的简单代码

    import java.util.concurrent.*;
    import java.util.*;
    
    public class CallBackDemo{
        public CallBackDemo(){
            System.out.println("creating service");
            ExecutorService service = Executors.newFixedThreadPool(5);
    
            try{
                for ( int i=0; i<5; i++){
                    Callback callback = new Callback(i+1);
                    MyCallable myCallable = new MyCallable((long)i+1,callback);
                    Future<Long> future = service.submit(myCallable);
                    //System.out.println("future status:"+future.get()+":"+future.isDone());
                }
            }catch(Exception err){
                err.printStackTrace();
            }
            service.shutdown();
        }
        public static void main(String args[]){
            CallBackDemo demo = new CallBackDemo();
        }
    }
    class MyCallable implements Callable<Long>{
        Long id = 0L;
        Callback callback;
        public MyCallable(Long val,Callback obj){
            this.id = val;
            this.callback = obj;
        }
        public Long call(){
            //Add your business logic
            System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
            callback.callbackMethod();
            return id;
        }
    }
    class Callback {
        private int i;
        public Callback(int i){
            this.i = i;
        }
        public void callbackMethod(){
            System.out.println("Call back:"+i);
            // Add your business logic
        }
    }
    

    输出:

    creating service
    Callable:1:pool-1-thread-1
    Call back:1
    Callable:3:pool-1-thread-3
    Callable:2:pool-1-thread-2
    Call back:2
    Callable:5:pool-1-thread-5
    Call back:5
    Call back:3
    Callable:4:pool-1-thread-4
    Call back:4
    

    主要说明:

    • 如果要按FIFO顺序依次处理过程任务,请将 newFixedThreadPool(5) 替换为 newFixedThreadPool(1)

    • 如果要在分析上一个任务的 callback 结果后处理下一个任务,只需在下面取消注释

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
    • 您可以用其中一个替换 newFixedThreadPool()
    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    取决于您的使用案例 .

    • 如果要异步处理回调方法

    一个 . 将共享的 ExecutorService or ThreadPoolExecutor 传递给Callable任务

    湾将 Callable 方法转换为 Callable/Runnable 任务

    C . 将回调任务推送到 ExecutorService or ThreadPoolExecutor

  • 1

    这是Pache 's answer using Guava' s ListenableFuture 的扩展 .

    特别是, Futures.transform() 返回 ListenableFuture ,因此可用于链接异步调用 . Futures.addCallback() 返回 void ,因此无法使用用于链接,但有助于在异步完成时处理成功/失败 .

    // ListenableFuture1: Open Database
    ListenableFuture<Database> database = service.submit(() -> openDatabase());
    
    // ListenableFuture2: Query Database for Cursor rows
    ListenableFuture<Cursor> cursor =
        Futures.transform(database, database -> database.query(table, ...));
    
    // ListenableFuture3: Convert Cursor rows to List<Foo>
    ListenableFuture<List<Foo>> fooList =
        Futures.transform(cursor, cursor -> cursorToFooList(cursor));
    
    // Final Callback: Handle the success/errors when final future completes
    Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
      public void onSuccess(List<Foo> foos) {
        doSomethingWith(foos);
      }
      public void onFailure(Throwable thrown) {
        log.error(thrown);
      }
    });
    

    NOTE: 除了链接异步任务外, Futures.transform() 还允许您在单独的执行程序上安排每个任务(本例中未显示) .

  • 45

    您可以使用Callable的实现

    public class MyAsyncCallable<V> implements Callable<V> {
    
        CallbackInterface ci;
    
        public MyAsyncCallable(CallbackInterface ci) {
            this.ci = ci;
        }
    
        public V call() throws Exception {
    
            System.out.println("Call of MyCallable invoked");
            System.out.println("Result = " + this.ci.doSomething(10, 20));
            return (V) "Good job";
        }
    }
    

    CallbackInterface是非常基本的东西

    public interface CallbackInterface {
        public int doSomething(int a, int b);
    }
    

    现在主要类看起来像这样

    ExecutorService ex = Executors.newFixedThreadPool(2);
    
    MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
    ex.submit(mac);
    

相关问题