Java8的CompletableFuture

  Java8提供了一种函数风格的异步和事件驱动编程模型CompletableFuture,该模型不同于以往Java版本,不会造成堵塞Blocking。过去,Java 5并发包主要聚焦于异步任务处理,其模型特点是基于一个生产者线程,不断地创造任务,通过一个堵塞Blocking队列传递给任务的消费者,这个模型在Java 7和Java 8以后使用了另外一种任务执行模型,同时将一个任务的数据分解到子集中,这些子集能够分别被同样的子任务独立地处理。这种风格后面的基础库包就是 fork/join框架。

  fork/join框架允许程序员指定一个数据集如何被切分多个子任务,将子任务提交一个标准默认的线程池:通用的ForkJoinPool。在Java 8中fork/join并行还可以通过并行流机制访问获得,但是这种方式的并行处理是有成本的和前提条件的:首先,元素处理必须能够独立进行,其次,数据集必须足够大,每个元素处理的消耗成本需要足够高,因为设置与启动fork/join框架本身也有一定的消耗,这些消耗相对于数据集合中每个元素处理的成本来说可以忽略不计,否则就不是很划算。

  Java 8的CompletableFuture背后也是依靠fork/join框架启动新的线程实现异步与并发的,一般情况下,我们将一个任务放到另外一个线程执行,可能就无需等待那个线程处理完成的结果,而是直接在主线程中返回完成,但是有一些业务恰是需要等待新启动的线程中任务完成,然后和当前主线程中的处理进行合并再处理,比如下面代码我们需要在另外一个线程进行很长时间的运行。

CompletableFuture  futureCount  =   CompletableFuture . supplyAsync (  
      ()   ->   {  
          try   {  
              // Simulate long running task  模拟长时间运行任务
              Thread . sleep ( 5000 );  
          }   catch   ( InterruptedException  e )   {   }  
          return   10 ;  
      });  
 //现在可以同时在当前主线程做其他事情,不用等待上面长时间运行任务结束

CompletableFuture.supplyAsync 允许你基于ForkJoinPool 异步地运行一个任务,同时也有选项供你选择更多对线程池的控制。下面是获得长时间运行任务的返回结果:

try   {  
      int  count  =  futureCount . get ();  
      System . out . println ( count );  
 }   catch   ( InterruptedException   |   ExecutionException  ex )   {  
      // Exceptions that occur in future will be thrown here.  
 } 

当对CompletableFuture的实力进行.get()方法调用时,在计算过程中任何exception将被抛出。

创建和获得CompletableFuture有下面四个方法,主要是supplyAsunc和runAsync两种,后者提供的方法参数必须是线程的Runnable,因为Runnable是不会返回任何结果,所以,如果你需要异步处理并返回结果,应该使用Supplier<U>。

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

我们需要将长时间任务放入supplyAsync方法体中,传统写法如下:

final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        //...长时间运行...
        return "42";
    }
}, executor);

使用java8的lambda则是如下:

final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    //...长时间运行...
    return "42";
}, executor);

甚至简化如下:

final CompletableFuture<String> future =
    CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);

 

thenApply用法

  从函数编程角度看,CompletableFuture 其实是一个monad和一个functor,如果你有Javacript编程经验,经常会返回或注册一个异步的callback回调函数,这样,我们就不必一直堵塞等待其处理完成再进行其他处理,这也是一种Future概念,意思是:当结果长时间计算出来以后,在结果上运行这个函数。我们可以运行多个这样的回调性质的函数,这可以使用CompletableFuture的 thenApply()方法:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

第一个没有Async的thenApply方法是指在future完成的当前线程应用参数中函数,而后面两个带Async的方法则是在不同线程池异步地应用参数中的函数。

我们以字符串转换为整数功能为例,如下:

CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);

或者一句:

CompletableFuture<Double> f3 =
    f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

这里定义了两个theApply转换功能,第一次是从字符串转换到Integer,然后再到Double。这些转换不是立即执行也不会堵塞,只是简单地记住,只有当f1完成以后才会执行这两个转换,如果一些转换动作很花费时间,你可以使用线程池异步处理。

 

thenAccept/thenRun用法

  在长时间计算完成后可以经过上面转换,但是在最后阶段有两个方法:

CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);

  thenAccept()提供了final最后的值,而thenRun执行Runnable就不会返回任何计算好的值或结果了。

future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");

  这两个方法是不会堵塞的,即使没有指定executor,可以将它们看成是对未来结果的监听者。

 

thenCompose()用法

  CompletableFuture的异步处理非常不错,有时,你需要在一个future值结构运行某个函数,但是这个函数也是返回某种future,也就是说是两个future彼此依赖串联在一起,它类似于Scala的中的flatMap。

<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);

下面是比较thenCompose和thenApply的不同案例(类似于Scala的map和flatMap的不同):

CompletableFuture<Document> docFuture = //...
 
CompletableFuture<CompletableFuture<Double>> f =
    docFuture.thenApply(this::calculateRelevance);
 
CompletableFuture<Double> relevanceFuture =
    docFuture.thenCompose(this::calculateRelevance);
 
//...
 
private CompletableFuture<Double> calculateRelevance(Document doc)  //...

thenCompose()是能够建立健壮 异步管道pipeline的方法,没有任何堵塞或中间步骤。

 

thenCombine()用法

  上面thenCompose()是用于多个彼此依赖的futrue进行串联起来,而thenCombine则是并联起两个独立的future,注意,这些future都是在长时间计算都完成以后。

<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

假设你有两个CompletableFuture,一个是加载Customer,而另外一个是加载最近商店Shop,它们两个都彼此独立,但是当都加载计算完毕以后,你需要使用它们的值计算路径:

CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture =
    customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
 
//...
 
private Route findRoute(Customer customer, Shop shop) //...

在Java8中可以使用this::findRoute 替代(cust, shop) -> findRoute(cust, shop) :

customerFuture.thenCombine(shopFuture, this::findRoute);

当我们有了customerFuture和shopFuture,那么routeFuture会包装它们两个并等待两个都完成计算,当都完成长时间计算以后,routeFuture会运行我们提供的函数findRoute(),routeFuture完成标志前两个前提future已经完成并且findRoute()也完成了。

当有多个CompletableFuture一起工作时,比如你希望在一个CompletableFuture返回的值和另外一个CompletableFuture返回值一起组合在一起再处理,可以使用thenCombine函数,

下篇

异步编程

并发编程

Java8

fork/join

Reactive编程