Java8的CompletableFuture之二
前面章节介绍了 CompletableFuture的一些基础功能,现在介绍复杂的一些高级功能。上一节介绍了onCombine是用来并行组合两个独立的Future,这需要建立一个新的CompletableFuture来组合利用两个结果,如果我们现在只是要求两个future计算完成后通知一下,怎么办?可以使用thenAcceptBoth()/runAfterBoth()。
thenAcceptBoth()/runAfterBoth()用法
这两个方法类似thenAccept()和thenRun(),不同之处是等待两个future完成,而不是等待一个future完成。
比如,假设两个future计算完成以后,我们只是需要立即发送一些事件,或者立即刷新GUI,这样就不必再建立一个新的future了,可以使用thenAcceptBoth():
这种方法比堵塞等待原来两个future要更异步性和具有事件驱动意义。
acceptEither/runAfterEither用法
CompletableFuture 还能等待第一个future而不是所有future计算完毕,当你有两个任务,两个都是同样类型的结果,你只关心响应时间,谁快用谁的结果,不关心具体哪个任务首先返回。
比如,你有两个系统需要继承,一个是会有很快的响应时间但是经常有偏差,而另外一个是慢一些,但是更具有可预期性。为了获得性能和预期性两种好处,你能同时调用这两个系统,等待其中一个第一个完成:
s可能是fetchFast()或fetchPredictably(),我们不关心,我们关系只要能第一个返回就可以,这样有时会第一个方法,有时会第二个方法返回,随机性可让我们充分利用计算机当时运行的情况潜力。
applyToEither()用法
applyToEither() 是acceptEither()的哥哥. 当两个future其中一个完成后,后者用于只是简单地调用一些代码,applyToEither()会返回一个新的future. 这个future是在前面两个future其中一个完成后进行执行完成。
firstDone 这个future能够被到处传递,从客户端调用角度看,前面两个future是隐藏在firstDone后面的。
allOf用法
现在我们知道了如何等待两个之中一个首先完成,但是如果不是两个,而是很多个,那么就要使用:
注意static,allOf()获得一个future数组然后当所有future都完成后返回一个future,注意,这里只是等待所有的future中最快完成。
RxJava结合
使用CompletableFuture可以和RxJava结合一起使用。RxJava的被观察者Observable类似一个对象流,能够主动异步推送消息给订阅者(观察者),我们以本文开始第一个案例,也就是计算器count为例,实现同样功能:
public CompletableFuture countEvents () {
CompletableFuture result = new CompletableFuture <>();
AtomicInteger count = new AtomicInteger ();
Observable . just ( "1" , "2" , "3" , "err" , "4" ). subscribe ( ev -> {
try {
int x = Integer . parseInt ( ev );
count . set ( count . get () + x );
} catch ( NumberFormatException e ) { }
},
throwable -> result . complete ( 0 );
() -> {
try {
//simulate io delay
Thread . sleep ( 3000 );
} catch ( InterruptedException e ) { }
result . complete ( count . get ());
}
);
return result ;
}
上述代码中Observable提供了1 2 3 err 4几个字符串流,假设遍历这个字符串流是一个费时长的工作,我们需要遍历后计算这个几个字符串的个数,调用代码如下:
CompletableFuture data = countEvents ()
. thenApply ( count -> {
int transformedValue = count * 25 ;
return transformedValue ;
}). thenApply ( transformed -> "data-" + transformed );
try {
System . out . println ( data . get ());
} catch ( InterruptedException | ExecutionException e ) {
e . printStaceTrace ();
RxJava教程