Java8的CompletableFuture之二

上篇

  前面章节介绍了 CompletableFuture的一些基础功能,现在介绍复杂的一些高级功能。上一节介绍了onCombine是用来并行组合两个独立的Future,这需要建立一个新的CompletableFuture来组合利用两个结果,如果我们现在只是要求两个future计算完成后通知一下,怎么办?可以使用thenAcceptBoth()/runAfterBoth()。

thenAcceptBoth()/runAfterBoth()用法

  这两个方法类似thenAccept()和thenRun(),不同之处是等待两个future完成,而不是等待一个future完成。

<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)

比如,假设两个future计算完成以后,我们只是需要立即发送一些事件,或者立即刷新GUI,这样就不必再建立一个新的future了,可以使用thenAcceptBoth():

customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
    final Route route = findRoute(cust, shop);
    //使用rout值刷新界面
});

这种方法比堵塞等待原来两个future要更异步性和具有事件驱动意义。

 

acceptEither/runAfterEither用法

  CompletableFuture 还能等待第一个future而不是所有future计算完毕,当你有两个任务,两个都是同样类型的结果,你只关心响应时间,谁快用谁的结果,不关心具体哪个任务首先返回。

CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)

比如,你有两个系统需要继承,一个是会有很快的响应时间但是经常有偏差,而另外一个是慢一些,但是更具有可预期性。为了获得性能和预期性两种好处,你能同时调用这两个系统,等待其中一个第一个完成:

CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
    System.out.println("Result: " + s);
});

s可能是fetchFast()或fetchPredictably(),我们不关心,我们关系只要能第一个返回就可以,这样有时会第一个方法,有时会第二个方法返回,随机性可让我们充分利用计算机当时运行的情况潜力。

 

applyToEither()用法

  applyToEither() 是acceptEither()的哥哥. 当两个future其中一个完成后,后者用于只是简单地调用一些代码,applyToEither()会返回一个新的future. 这个future是在前面两个future其中一个完成后进行执行完成。

CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone =
    fast.applyToEither(predictable, Function.<String>identity());

firstDone 这个future能够被到处传递,从客户端调用角度看,前面两个future是隐藏在firstDone后面的。

 

allOf用法

  现在我们知道了如何等待两个之中一个首先完成,但是如果不是两个,而是很多个,那么就要使用:

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

注意static,allOf()获得一个future数组然后当所有future都完成后返回一个future,注意,这里只是等待所有的future中最快完成。

 

RxJava结合

  使用CompletableFuture可以和RxJava结合一起使用。RxJava的被观察者Observable类似一个对象流,能够主动异步推送消息给订阅者(观察者),我们以本文开始第一个案例,也就是计算器count为例,实现同样功能:

public   CompletableFuture  countEvents ()   {  
      CompletableFuture  result  =   new   CompletableFuture <>();  
      AtomicInteger  count  =   new   AtomicInteger ();  
      Observable . just ( "1" ,   "2" ,   "3" ,   "err" ,   "4" ). subscribe ( ev  ->   {  
              try   {  
                  int  x  =   Integer . parseInt ( ev );  
                 count . set ( count . get ()   +  x );  
              }   catch   ( NumberFormatException  e )   {   }  
          },  
         throwable  ->  result . complete ( 0 );  
          ()   ->   {  
              try   {  
                  //simulate io delay  
                  Thread . sleep ( 3000 );  
              }   catch   ( InterruptedException  e )   {   }  
             result . complete ( count . get ());  
          }  
      );  
      return  result ;  
 } 

上述代码中Observable提供了1 2 3 err 4几个字符串流,假设遍历这个字符串流是一个费时长的工作,我们需要遍历后计算这个几个字符串的个数,调用代码如下:

CompletableFuture  data  =  countEvents ()  
      . thenApply ( count  ->   {  
          int  transformedValue  =  count  *   25 ;  
          return  transformedValue ;  
      }). thenApply ( transformed  ->   "data-"   +  transformed );  
  
try   {  
      System . out . println ( data . get ());  
 }   catch   ( InterruptedException   |   ExecutionException  e )   {  
     e . printStaceTrace (); 

 

RxJava教程

异步编程

并发编程

Java8

fork/join