首页 文章

针对Reactor(或Akka)解决方案的CompletableFuture解决方案

提问于
浏览
2

我有以下方法使用 CompletableFuture 这样:

public AClass aMethod() {

    CompletableFuture<SomeClassA> someClassAFuture =
        CompletableFuture.supplyAsync(() -> someMethodThatReturnsA());
    CompletableFuture<SomeClassB> someClassBFuture =
        CompletableFuture.supplyAsync(() -> someMethodThatReturnsB());
    CompletableFuture<SomeClassC> someClassCFuture =
        CompletableFuture.supplyAsync(() -> someMethodThatReturnsC());

    CompletableFuture.allOf(someClassAFuture, someClassBFuture, someClassCFuture).join();

    return new AClass(someClassAFuture.join(), someClassBFuture.join(), someClassCFuture.join());
}

如果fork连接池中的线程数少于 T * 3 ,则 T 线程同时进入该方法时此代码存在死锁问题(因为没有 allOf 调用可以完成,并且它们不会将当前获取的线程返回到池中) .

我找到解决这个问题的唯一方法是限制方法中的同时线程(使用Spring的 @Async 注释与线程执行程序)或增加fork连接池中的线程 .

我想要一些更好的解决方案,我可以完全忘记线程池大小 . 如何使用Reactor或Akka重写它?

1 回答

  • 0

    在Akka期货的实施将是(完全未经测试):

    Future< SomeClassA > f1 = future(() -> someMethodThatReturnsA(), system.dispatcher());
    Future< SomeClassB > f2 = future(() -> someMethodThatReturnsB(), system.dispatcher());
    Future< SomeClassC > f3 = future(() -> someMethodThatReturnsC(), system.dispatcher());
    
    List<Future<Object>> futures = Arrays.asList(f1, f2, f3);
    
    return sequence(futures).map((results) ->  new AClass(results.get(0),results.get(1),results.get(2)));
    

    在创建 AClass 之前,可能需要一些额外的工作来解析期货结果 . 请注意,您现在在 aMethod 中返回 Future< AClass >

    但是,代码的问题在于它是阻塞的 . 您是否尝试使用 thenApplythenCompose 加入所有CompletableFutures以返回 CompletableFuture<AClass>

相关问题