网站建设与维护方式,百度站长工具综合查询,长春seo公司长春网站设计,电源网站模版Java 8 有大量的新特性和增强如 Lambda 表达式#xff0c;Streams#xff0c;CompletableFuture等。在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用。
什么是CompletableFuture#xff1f;
在Java中CompletableFuture用于异步编程#xff0c;异步编…Java 8 有大量的新特性和增强如 Lambda 表达式StreamsCompletableFuture等。在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用。
什么是CompletableFuture
在Java中CompletableFuture用于异步编程异步编程是编写非阻塞的代码运行的任务在一个单独的线程与主线程隔离并且会通知主线程它的进度成功或者失败。
在这种方式中主线程不会被阻塞不需要一直等到子线程完成。主线程可以并行的执行其他任务。
使用这种并行方式可以极大的提高程序的性能。
Future vs CompletableFuture
CompletableFuture 是 Future API的扩展。
Future 被用于作为一个异步计算结果的引用。提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时get() 方法用来接收计算任务的结果。
从 Callbale和 Future 教程可以学习更多关于 Future 知识.
Future API 是非常好的 Java 异步编程进阶但是它缺乏一些非常重要和有用的特性。
Future 的局限性
不能手动完成 当你写了一个函数用于通过一个远程API获取一个电子商务产品最新价格。因为这个 API 太耗时你把它允许在一个独立的线程中并且从你的函数中返回一个 Future。现在假设这个API服务宕机了这时你想通过该产品的最新缓存价格手工完成这个Future 。你会发现无法这样做。Future 的结果在非阻塞的情况下不能执行更进一步的操作 Future 不会通知你它已经完成了它提供了一个阻塞的 get() 方法通知你结果。你无法给 Future 植入一个回调函数当 Future 结果可用的时候用该回调函数自动的调用 Future 的结果。多个 Future 不能串联在一起组成链式调用 有时候你需要执行一个长时间运行的计算任务并且当计算任务完成的时候你需要把它的计算结果发送给另外一个长时间运行的计算任务等等。你会发现你无法使用 Future 创建这样的一个工作流。不能组合多个 Future 的结果 假设你有10个不同的Future你想并行的运行然后在它们运行未完成后运行一些函数。你会发现你也无法使用 Future 这样做。没有异常处理 Future API 没有任务的异常处理结构居然有如此多的限制幸好我们有CompletableFuture你可以使用 CompletableFuture 达到以上所有目的。
CompletableFuture 实现了 Future 和 CompletionStage接口并且提供了许多关于创建链式调用和组合多个 Future 的便利方法集而且有广泛的异常处理支持。
创建 CompletableFuture
1. 简单的例子 可以使用如下无参构造函数简单的创建 CompletableFuture
CompletableFutureString completableFuture new CompletableFutureString();
这是一个最简单的 CompletableFuture想获取CompletableFuture 的结果可以使用 CompletableFuture.get() 方法
String result completableFuture.get()
get() 方法会一直阻塞直到 Future 完成。因此以上的调用将被永远阻塞因为该Future一直不会完成。
你可以使用 CompletableFuture.complete() 手工的完成一个 Future
completableFuture.complete(Futures Result)
所有等待这个 Future 的客户端都将得到一个指定的结果并且 completableFuture.complete() 之后的调用将被忽略。
2. 使用 runAsync() 运行异步计算 如果你想异步的运行一个后台任务并且不想改任务返回任务东西这时候可以使用 CompletableFuture.runAsync()方法它持有一个Runnable 对象并返回 CompletableFutureVoid。
// Run a task specified by a Runnable Object asynchronously.
CompletableFutureVoid future CompletableFuture.runAsync(new Runnable() {Overridepublic void run() {// Simulate a long-running Jobtry {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}System.out.println(Ill run in a separate thread than the main thread.);}
});// Block and wait for the future to complete
future.get()
你也可以以 lambda 表达式的形式传入 Runnable 对象
// Using Lambda Expression
CompletableFutureVoid future CompletableFuture.runAsync(() - {// Simulate a long-running Job try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}System.out.println(Ill run in a separate thread than the main thread.);
});
在本文中我使用lambda表达式会比较频繁如果以前你没有使用过建议你也多使用lambda 表达式。
3. 使用 supplyAsync() 运行一个异步任务并且返回结果 当任务不需要返回任何东西的时候 CompletableFuture.runAsync() 非常有用。但是如果你的后台任务需要返回一些结果应该要怎么样
CompletableFuture.supplyAsync() 就是你的选择。它持有supplierT 并且返回CompletableFutureTT 是通过调用 传入的supplier取得的值的类型。
// Run a task specified by a Supplier object asynchronously
CompletableFutureString future CompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return Result of the asynchronous computation;}
});// Block and get the result of the Future
String result future.get();
System.out.println(result);
SupplierT 是一个简单的函数式接口表示supplier的结果。它有一个get()方法该方法可以写入你的后台任务中并且返回结果。
你可以使用lambda表达式使得上面的示例更加简明
// Using Lambda Expression
CompletableFutureString future CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return Result of the asynchronous computation;
});
一个关于Executor 和Thread Pool笔记 你可能想知道我们知道runAsync() 和supplyAsync()方法在单独的线程中执行他们的任务。但是我们不会永远只创建一个线程。 CompletableFuture可以从全局的 ForkJoinPool.commonPool()获得一个线程中执行这些任务。 但是你也可以创建一个线程池并传给runAsync() 和supplyAsync()方法来让他们从线程池中获取一个线程执行它们的任务。 CompletableFuture API 的所有方法都有两个变体-一个接受Executor作为参数另一个不这样// Variations of runAsync() and supplyAsync() methods
static CompletableFutureVoid runAsync(Runnable runnable)
static CompletableFutureVoid runAsync(Runnable runnable, Executor executor)
static U CompletableFutureU supplyAsync(SupplierU supplier)
static U CompletableFutureU supplyAsync(SupplierU supplier, Executor executor)
创建一个线程池并传递给其中一个方法
Executor executor Executors.newFixedThreadPool(10);
CompletableFutureString future CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return Result of the asynchronous computation;
}, executor);
在 CompletableFuture 转换和运行
CompletableFuture.get()方法是阻塞的。它会一直等到Future完成并且在完成后返回结果。 但是这是我们想要的吗对于构建异步系统我们应该附上一个回调给CompletableFuture当Future完成的时候自动的获取结果。 如果我们不想等待结果返回我们可以把需要等待Future完成执行的逻辑写入到回调函数中。
可以使用 thenApply(), thenAccept() 和thenRun()方法附上一个回调给CompletableFuture。
1. thenApply() 可以使用 thenApply() 处理和改变CompletableFuture的结果。持有一个FunctionR,T作为参数。FunctionR,T是一个简单的函数式接口接受一个T类型的参数产出一个R类型的结果。
// Create a CompletableFuture
CompletableFutureString whatsYourNameFuture CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return Rajeev;
});// Attach a callback to the Future using thenApply()
CompletableFutureString greetingFuture whatsYourNameFuture.thenApply(name - {return Hello name;
});// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev
你也可以通过附加一系列的thenApply()在回调方法 在CompletableFuture写一个连续的转换。这样的话结果中的一个 thenApply方法就会传递给该系列的另外一个 thenApply方法。
CompletableFutureString welcomeText CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return Rajeev;
}).thenApply(name - {return Hello name;
}).thenApply(greeting - {return greeting , Welcome to the CalliCoder Blog;
});System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog
2. thenAccept() 和 thenRun() 如果你不想从你的回调函数中返回任何东西仅仅想在Future完成后运行一些代码片段你可以使用thenAccept() 和 thenRun()方法这些方法经常在调用链的最末端的最后一个回调函数中使用。CompletableFuture.thenAccept() 持有一个ConsumerT 返回一个CompletableFutureVoid。它可以访问CompletableFuture的结果
// thenAccept() example
CompletableFuture.supplyAsync(() - {return ProductService.getProductDetail(productId);
}).thenAccept(product - {System.out.println(Got product detail from remote service product.getName())
});
虽然thenAccept()可以访问CompletableFuture的结果但thenRun()不能访Future的结果它持有一个Runnable返回CompletableFutureVoid
// thenRun() example
CompletableFuture.supplyAsync(() - {// Run some computation
}).thenRun(() - {// Computation Finished.
});
异步回调方法的笔记 CompletableFuture提供的所有回调方法都有两个变体 // thenApply() variants U CompletableFutureU thenApply(Function? super T,? extends U fn) U CompletableFutureU thenApplyAsync(Function? super T,? extends U fn) U CompletableFutureU thenApplyAsync(Function? super T,? extends U fn, Executor executor) 这些异步回调变体通过在独立的线程中执行回调任务帮助你进一步执行并行计算。 以下示例CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return Some Result
}).thenApply(result - {/* Executed in the same thread where the supplyAsync() task is executedor in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)*/return Processed Result
})
在以上示例中在thenApply()中的任务和在supplyAsync()中的任务执行在相同的线程中。任何supplyAsync()立即执行完成,那就是执行在主线程中尝试删除sleep测试下。 为了控制执行回调任务的线程你可以使用异步回调。如果你使用thenApplyAsync()回调将从ForkJoinPool.commonPool()获取不同的线程执行。
CompletableFuture.supplyAsync(() - {return Some Result
}).thenApplyAsync(result - {// Executed in a different thread from ForkJoinPool.commonPool()return Processed Result
})
此外如果你传入一个Executor到thenApplyAsync()回调中任务将从Executor线程池获取一个线程执行。
Executor executor Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() - {return Some result
}).thenApplyAsync(result - {// Executed in a thread obtained from the executorreturn Processed Result
}, executor);
组合两个CompletableFuture
1. 使用 thenCompose() 组合两个独立的future 假设你想从一个远程API中获取一个用户的详细信息一旦用户信息可用你想从另外一个服务中获取他的贷方。 考虑下以下两个方法getUserDetail() 和getCreditRating()的实现
CompletableFutureUser getUsersDetail(String userId) {return CompletableFuture.supplyAsync(() - {UserService.getUserDetails(userId);});
}CompletableFutureDouble getCreditRating(User user) {return CompletableFuture.supplyAsync(() - {CreditRatingService.getCreditRating(user);});
}
现在让我们弄明白当使用了thenApply()后是否会达到我们期望的结果-
CompletableFutureCompletableFutureDouble result getUserDetail(userId)
.thenApply(user - getCreditRating(user));
在更早的示例中Supplier函数传入thenApply将返回一个简单的值但是在本例中将返回一个CompletableFuture。以上示例的最终结果是一个嵌套的CompletableFuture。 如果你想获取最终的结果给最顶层future使用 thenCompose()方法代替-
CompletableFutureDouble result getUserDetail(userId)
.thenCompose(user - getCreditRating(user));
因此规则就是-如果你的回调函数返回一个CompletableFuture但是你想从CompletableFuture链中获取一个直接合并后的结果这时候你可以使用thenCompose()。
2. 使用thenCombine()组合两个独立的 future 虽然thenCompose()被用于当一个future依赖另外一个future的时候用来组合两个future。thenCombine()被用来当两个独立的Future都完成的时候用来做一些事情。
System.out.println(Retrieving weight.);
CompletableFutureDouble weightInKgFuture CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return 65.0;
});System.out.println(Retrieving height.);
CompletableFutureDouble heightInCmFuture CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return 177.8;
});System.out.println(Calculating BMI.);
CompletableFutureDouble combinedFuture weightInKgFuture.thenCombine(heightInCmFuture, (weightInKg, heightInCm) - {Double heightInMeter heightInCm/100;return weightInKg/(heightInMeter*heightInMeter);
});System.out.println(Your BMI is - combinedFuture.get());
当两个Future都完成的时候传给thenCombine()的回调函数将被调用。
组合多个CompletableFuture
我们使用thenCompose() 和 thenCombine()把两个CompletableFuture组合在一起。现在如果你想组合任意数量的CompletableFuture应该怎么做我们可以使用以下两个方法组合任意数量的CompletableFuture。
static CompletableFutureVoid allOf(CompletableFuture?... cfs)
static CompletableFutureObject anyOf(CompletableFuture?... cfs)
1. CompletableFuture.allOf()CompletableFuture.allOf的使用场景是当你一个列表的独立future并且你想在它们都完成后并行的做一些事情。
假设你想下载一个网站的100个不同的页面。你可以串行的做这个操作但是这非常消耗时间。因此你想写一个函数传入一个页面链接返回一个CompletableFuture异步的下载页面内容。
CompletableFutureString downloadWebPage(String pageLink) {return CompletableFuture.supplyAsync(() - {// Code to download and return the web pages content});
}
现在当所有的页面已经下载完毕你想计算包含关键字CompletableFuture页面的数量。可以使用CompletableFuture.allOf()达成目的。
ListString webPageLinks Arrays.asList(...) // A list of 100 web page links// Download contents of all the web pages asynchronously
ListCompletableFutureString pageContentFutures webPageLinks.stream().map(webPageLink - downloadWebPage(webPageLink)).collect(Collectors.toList());// Create a combined Future using allOf()
CompletableFutureVoid allFutures CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);
使用CompletableFuture.allOf()的问题是它返回CompletableFutureVoid。但是我们可以通过写一些额外的代码来获取所有封装的CompletableFuture结果。
// When all the Futures are completed, call future.join() to get their results and collect the results in a list -
CompletableFutureListString allPageContentsFuture allFutures.thenApply(v - {return pageContentFutures.stream().map(pageContentFuture - pageContentFuture.join()).collect(Collectors.toList());
});
花一些时间理解下以上代码片段。当所有future完成的时候我们调用了future.join()因此我们不会在任何地方阻塞。
join()方法和get()方法非常类似这唯一不同的地方是如果最顶层的CompletableFuture完成的时候发生了异常它会抛出一个未经检查的异常。
现在让我们计算包含关键字页面的数量。
// Count the number of web pages having the CompletableFuture keyword.
CompletableFutureLong countFuture allPageContentsFuture.thenApply(pageContents - {return pageContents.stream().filter(pageContent - pageContent.contains(CompletableFuture)).count();
});System.out.println(Number of Web Pages having CompletableFuture keyword - countFuture.get());
2. CompletableFuture.anyOf()
CompletableFuture.anyOf()和其名字介绍的一样当任何一个CompletableFuture完成的时候【相同的结果类型】返回一个新的CompletableFuture。以下示例
CompletableFutureString future1 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {throw new IllegalStateException(e);}return Result of Future 1;
});CompletableFutureString future2 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}return Result of Future 2;
});CompletableFutureString future3 CompletableFuture.supplyAsync(() - {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new IllegalStateException(e);}return Result of Future 3;
});CompletableFutureObject anyOfFuture CompletableFuture.anyOf(future1, future2, future3);System.out.println(anyOfFuture.get()); // Result of Future 2
在以上示例中当三个中的任何一个CompletableFuture完成 anyOfFuture就会完成。因为future2的休眠时间最少因此她最先完成最终的结果将是future2的结果。
CompletableFuture.anyOf()传入一个Future可变参数返回CompletableFutureObject。CompletableFuture.anyOf()的问题是如果你的CompletableFuture返回的结果是不同类型的这时候你讲会不知道你最终CompletableFuture是什么类型。
CompletableFuture 异常处理
我们探寻了怎样创建CompletableFuture转换它们并组合多个CompletableFuture。现在让我们弄明白当发生错误的时候我们应该怎么做。
首先让我们明白在一个回调链中错误是怎么传递的。思考下以下回调链
CompletableFuture.supplyAsync(() - {// Code which might throw an exceptionreturn Some result;
}).thenApply(result - {return processed result;
}).thenApply(result - {return result after further processing;
}).thenAccept(result - {// do something with the final result
});
如果在原始的supplyAsync()任务中发生一个错误这时候没有任何thenApply会被调用并且future将以一个异常结束。如果在第一个thenApply发生错误这时候第二个和第三个将不会被调用同样的future将以异常结束。
1. 使用 exceptionally() 回调处理异常exceptionally()回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。
Integer age -1;CompletableFutureString maturityFuture CompletableFuture.supplyAsync(() - {if(age 0) {throw new IllegalArgumentException(Age can not be negative);}if(age 18) {return Adult;} else {return Child;}
}).exceptionally(ex - {System.out.println(Oops! We have an exception - ex.getMessage());return Unknown!;
});System.out.println(Maturity : maturityFuture.get());
2. 使用 handle() 方法处理异常 API提供了一个更通用的方法 - handle()从异常恢复无论一个异常是否发生它都会被调用。
Integer age -1;CompletableFutureString maturityFuture CompletableFuture.supplyAsync(() - {if(age 0) {throw new IllegalArgumentException(Age can not be negative);}if(age 18) {return Adult;} else {return Child;}
}).handle((res, ex) - {if(ex ! null) {System.out.println(Oops! We have an exception - ex.getMessage());return Unknown!;}return res;
});System.out.println(Maturity : maturityFuture.get());
如果异常发生res参数将是 null否则ex将是 null。