用途
CompletableFuture.allOf(...):
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) { return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) );}
关于您的实现的一些评论:
您使用的
.thenComposeAsync,
.thenApplyAsync并且
.thenCombineAsync很可能没有做你的期望。这些
...Async方法在单独的线程中运行提供给它们的函数。因此,在您的情况下,您导致将新项添加到列表中以在提供的执行程序中运行。无需将轻量级 *** 作填充到缓存的线程执行器中。请勿在
thenXXXXAsync无充分理由的情况下使用方法。
另外,
reduce不应用于堆积到易变容器中。即使在流是顺序的流时它可能正确工作,但是如果将流设为并行流,它将失败。要执行可变减少,请
.collect改用。
如果要在第一次失败后立即异常完成整个计算,请在您的
sequence方法中执行以下 *** 作:
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) );com.forEach(f -> f.whenComplete((t, ex) -> { if (ex != null) { result.completeExceptionally(ex); }}));return result;
此外,如果您想在第一次失败时取消其余 *** 作,请在
exec.shutdownNow();之后添加
result.completeExceptionally(ex);。当然,这假定
exec仅针对这一计算存在。如果没有,则必须循环遍历并分别取消剩余的
Future每个。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)