使用CompletableFuture实现Java方法异步调用和回调

Java方法的异步调用最容易联想到使用线程,将需要异步执行的方法放在另外一个线程中执行:


new Thread(() -> {
//Do whatever
}).start();

如果想准确地等待并获得返回的结果,可以使用Java中Future是用来实现异步计算,其计算结果需要通过 get方法获取,问题是在计算完成之前调用get是阻塞的,这造成了非常严格的使用限制,使异步计算毫无意义。

CompletableFuture除了实现Future接口外,还实现了CompletionStage接口。 CompletionStage是一种承诺。它承诺最终将完成计算,最棒的是CompletionStage是它提供了大量的方法,可以让你能附加方法,在其完成时对这些方法执行的回调,这样我们就可以以非阻塞的方式构建系统。

最简单的异步计算


CompletableFuture.supplyAsync(this::sendMsg);

就这么简单。

supplyAsync方法参数需要提供一个方法,这个方法是我们想要异步执行的代码-在这里这个方法是sendMsg()。

这段异步代码被提交给ForkJoinPool.commonPool()执行

附加回调

回调的优点在于:在完成异步计算时再执行什么,而无需我们主动去等待结果,在第一个例子中,我们只是通过sendMsg在自己的线程中执行来异步发送消息。现在让我们添加一个回调函数,我们会在其中通知发送消息的方式。


CompletableFuture.supplyAsync(this::sendMsg)
.thenAccept(this::notify);

thenAccept是添加回调的众多方法之一。它需要一个Consumer- 在我们的例子中notify- 它在sendMsg完成时处理其结果,也就是说sendMsg方法的输出结果正好是notify方法的入参。

链接多个回调
如果要继续将值从一个回调传递到另一个回调,thenAccept则不会删除它,因为Consumer不返回任何内容。

为了保持传递值,您可以简单地使用thenApply。

thenApply接受一个Function,但也返回一个Function:


CompletableFuture.supplyAsync(this::findReceiver)
.thenApply(this::sendMsg)
.thenAccept(this::notify);

异步任务将首先找到一个接收器findReceiver,然后在将结果传递给最后一个回调通知notify之前向接收器发送一条消息sendMsg。

构建异步系统
如果我们继续使用thenApply上面的例子,我们最终会得到嵌套的CompletionStage:


CompletableFuture.supplyAsync(this::findReceiver)
.thenApply(this::sendMsgAsync);

// Returns type CompletionStage<CompletionStage<String>>

这种嵌套的返回不是我们希望的,可以使用thenCompose,它能给一个Function返回CompletionStage。这将具有像flatMap一样的展平效果。

CompletableFuture.supplyAsync(this::findReceiver)
.thenCompose(this::sendMsgAsync);

// Returns type CompletionStage<String>

通过这种方式,我们可以在不丢失一层的情况下继续编写新功能CompletionStage

将回调作为单独的任务
到目前为止,我们所有的回调都在与它们的前任相同的线程上执行。如果您愿意,可以ForkJoinPool.commonPool()单独提交回调,而不是使用与前一个相同的线程,可以通过使用带Async异步后缀的方法版本来完成CompletionStage.

先看看再一个线程上执行:


CompletableFuture<String> receiver
= CompletableFuture.supplyAsync(this::findReceiver);

receiver.thenApply(this::sendMsg);
receiver.thenApply(this::sendOtherMsg);

在上面的示例中,所有内容都将在同一个线程上执行。这导致最后一条消息等待第一条消息完成。

引入异步:


ompletableFuture<String> receiver
= CompletableFuture.supplyAsync(this::findReceiver);

receiver.thenApplyAsync(this::sendMsg);
receiver.thenApplyAsync(this::sendOtherMsg);

注意到thenApplyAsync的async后缀,每条消息都作为单独的任务提交给ForkJoinPool.commonPool(),回调方法sendMsg和sendOtherMsg执行时异步的,后者不会等待前者完成。

当一切都出错时该怎么办
当发生异常时,幸运的是,CompletableFuture有一个很好的处理方式,使用exceptionally:


CompletableFuture.supplyAsync(this::failingMsg)
.exceptionally(ex -> new Result(Status.FAILED))
.thenAccept(this::notify);

exceptionally 如果先前的计算失败并带有异常,则通过采用将执行的替代函数使我们有机会进行恢复回退。

回调会继续成功执行,只是输入参数是exceptionally执行的输出结果。

以受控方式处理超时
超时是我们编码人员经常需要照顾的事情。有时我们根本无法等待计算完成。这也适用于CompletableFutures。

这已经在Java 9中通过引入两种新方法来解决,这些方法将使我们能够处理超时 - orTimeout和completeOnTimeout。


CompletableFuture.supplyAsync(this::hangingMsg)
.orTimeout(1, TimeUnit.MINUTES);

如果我们的挂消息在一分钟内没有完成,TimeoutException则会抛出一条消息。然后可以通过exceptionally转到回调来处理此异常。

另一个选择是使用completeOnTimeout它,这比抛出异常更好,因为它允许我们以一种很好的控制方式恢复。


CompletableFuture.supplyAsync(this::hangingMsg)
.completeOnTimeout(new Result(Status.TIMED_OUT),1, TimeUnit.MINUTES);

回调取决于多个计算
如果一个的回调输入参数是需要依赖于两次计算结果,使用thenCombine:
下面代码是除了找到接收器之外,还要在发送消息之前执行创建一些内容的繁重工作:


CompletableFuture<String> to =
CompletableFuture.supplyAsync(this::findReceiver);

CompletableFuture<String> text =
CompletableFuture.supplyAsync(this::createContent);

to.thenCombine(text, this::sendMsg);

我们启动了两个异步作业 - 查找接收器并创建一些内容。然后我们thenCombine通过定义我们想要对这两个计算的结果做些什么。

回调取决于其中一个
我们现在已经介绍了依赖于两次计算的场景。现在,当你只需要其中一个的结果时呢?


CompletableFuture<String> firstSource =
CompletableFuture.supplyAsync(this::findByFirstSource);

CompletableFuture<String> secondSource =
CompletableFuture.supplyAsync(this::findBySecondSource);

firstSource.acceptEither(secondSource, this::sendMsg);

通过acceptEither实现等待两个计算结果,然后使用第二个返回的结果。

Java: Writing asynchronous code with CompletableFu