我想将 List<CompletableFuture<X>>
转换为 CompletableFuture<List<T>>
. 这非常有用,因为当您有许多异步任务并且需要获得所有异步任务的结果时 .
如果其中任何一个失败,则最终的未来将失败 . 这就是我实施的方式:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
要运行它:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
如果其中任何一个失败,则失败 . 即使有一百万个期货,它也能按预期产出 . 我遇到的问题是:如果有超过5000个期货,如果其中任何一个失败,我得到一个 StackOverflowError
:
java.util.conmple当前的java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)中的线程“pool-1-thread-2611”java.lang.StackOverflowError中的异常$ ThenCompose.run(CompletableFuture . java:1487)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)at java.util.concurrent.CompletableFuture $ ThenCompose.run (CompletableFuture.java:1487)
我做错了什么?
注意:当任何未来失败时,上述返回的未来将失败 . 接受的答案也应该采取这一点 .
8 回答
除Spotify Futures库外,您可以尝试我的代码在此处找到:https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java(与同一包中的其他类有依赖关系)
它实现了一个逻辑,用于返回“至少N个M”中的CompletionStage-s,其中包含允许允许的错误量 . 对于所有/任何情况都有方便的方法,加上剩余期货的取消政策,加上代码处理CompletionStage-s(接口)而不是CompletableFuture(具体类) .
你可以获得Spotify的
CompletableFutures
库并使用allAsList方法 . 我认为这是's inspired from Guava'的Futures.allAsList方法 .如果您不想使用库,这是一个简单的实现:
Disclaimer: 这不能完全回答最初的问题 . 它将缺少"fail all if one fails"部分 . 但是,我无法回答实际的,更通用的问题,因为它被关闭作为这个问题的副本:Java 8 CompletableFuture.allOf(...) with Collection or List . 所以我会在这里回答:
Summary: 使用以下内容:
用法示例:
Complete Example:
使用
CompletableFuture.allOf(...)
:关于您的实施的一些评论:
您对
.thenComposeAsync
,.thenApplyAsync
和.thenCombineAsync
的使用可能没有达到预期效果 . 这些...Async
方法在一个单独的线程中运行提供给它们的函数 . 因此,在您的情况下,您正在将新项添加到列表中以在提供的执行程序中运行 . 没有必要将轻量级操作填充到缓存的线程执行程序中 . 没有充分理由不要使用thenXXXXAsync
方法 .此外,
reduce
不应用于累积到可变容器中 . 即使流顺序时它可能正常工作,但如果要使流并行,它将失败 . 要执行可变缩减,请改用.collect
.如果要在第一次失败后立即异常完成整个计算,请在
sequence
方法中执行以下操作:另外,如果要在第一次失败时取消剩余的操作,请在
result.completeExceptionally(ex);
之后添加exec.shutdownNow();
. 当然,这假设exec
仅存在于这一计算中 . 如果它没有't, you'必须循环并单独取消每个剩余的Future
.如Misha has pointed out,您正在过度使用
…Async
操作 . 此外,您正在编写一个复杂的操作链,用于建模依赖项,该依赖项不反映您的程序逻辑:您创建的作业x取决于列表的第一个和第二个作业
您创建的作业x 1取决于作业x和列表的第三个作业
您创建的作业x 2取决于作业x 1和列表中的第4个作业
......
您创建的作业x 5000取决于作业x 4999和列表的最后一个作业
然后,可以递归地执行取消(显式地或由于异常)这个递归组合的作业,并且可能会失败
StackOverflowError
. 这是依赖于实现的 .作为already shown by Misha,有一个方法,allOf,它允许您建模您的初衷,定义一个取决于您的列表的所有作业的作业 .
但是,值得注意的是,即使这样也没有必要 . 由于您使用的是无界线程池执行程序,因此您只需将收集结果的异步作业发布到列表中即可完成 . 无论如何,通过询问每项工作的结果来暗示等待完成 .
当线程数量有限且作业可能产生额外的异步作业时,使用编写相关操作的方法很重要,以避免等待作业从必须首先完成的作业中窃取线程,但这里也不是这种情况 .
在这个具体案例中作业只是迭代大量的先决条件作业,并在必要时进行等待可能比对大量依赖项建模更有效,并让每个作业通知从属作业完成 .
在CompletableFuture上使用thenCombine的示例序列操作
如果你不介意使用第三方库cyclops-react(我是作者)有一套CompletableFutures(和Optionals,Streams等)的实用工具方法
为了加上@Misha接受的答案,可以进一步扩展为收藏家:
现在你可以:
Javaslang有一个非常方便的Future API . 它还允许从未来的集合中收集未来 .
见http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-