发帖  主题  评论  推荐  标签 用户 查搜   用户 密码 自动 注册  
 
面向对象 设计模式 领域驱动设计 企业架构 框架 开发教程 微服务 大数据 扩展性 并发编程 事件驱动 分布式 SOA

Java8的completablefuture和parallel stream比较

  本文介绍ava 8的 CompletableFuture与并行流parallel stream在异步计算方面的比较。

我们以下面需呀长时间运行的任务模型为案例:

class MyTask {
  private final int duration;
  public MyTask(int duration) {
    this.duration = duration;
  }
  public int calculate() {
    System.out.println(Thread.currentThread().getName());
    try {
      Thread.sleep(duration * 1000);
    } catch (final InterruptedException e) {
      throw new RuntimeException(e);
    }
    return duration;
  }
}

让我们创建十个任务,间隔一秒执行

List<MyTask> tasks = IntStream.range(0, 10)
                                    .mapToObj(i -> new MyTask(1))
                                    .collect(toList());

 

如何实现这个任务更有效率呢?

首先是顺序执行,如下代码:

public static void runSequentially(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.stream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

这个任务在main主线程中一个接着一个执行,花费了10秒。

下面我们看看使用parallel stream:

public static void useParallelStream(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.parallelStream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

输出结果是:

main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main
Processed 10 tasks in 3043 millis

 

这个方式花费了3秒,因为有4个任务并行运行,使用了ForkJoin的3个线程。

再看看CompletablFutrue方式,代码如下:

public static void useCompletableFuture(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
           .collect(Collectors.toList());
 
  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

 

在这个代码中获得了CompletableFuture的List,然后调用此future上Join方法等待它们一个个完成,注意join等同于get,唯一区别是前者不会扔出任何检查意外exception,更加方便于lambda表达式。

这里使用了两个分类的流管道,而不是每个放入各自的map操作,因为内部流操作是赖计算,这就能在等待他们计算完成前收集CompletableFuture到一个list中然后才会开始计算。

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 4010 millis

 

花了4秒处理了十个任务,只有3个FirkJoinPool使用,不像parallel stream,主线程并没有使用,因此不会堵塞主线程。

下面我们再看看使用订制的执行器结合CompletableFuture:

public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
  long start = System.nanoTime();
  ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
           .collect(Collectors.toList()); 
  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
  executor.shutdown();
}

 

你可以定制一个自己的Executor来进行计算,根据你的应用特点,这个应用不是CPU密集型,因此可以增加线程数量,超过Runtime.getRuntime().getAvailableProcessors()。

输出结果:

pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1009 millis

 

花了一秒执行了10个任务,该方式最快。 如果你是I/O密集型的计算,可以增加线程池大小,如果是CPU密集型计算,就不能增加比你的处理器个数再多的线程了,这是使用parallel stream比较好一些。

Java8专题

性能专题

解道移动版 | 关注解道 | 联系解道 | 关于解道 | 广告联系 | 网站地图 | 设为首页

沪ICP证12033263 如有意见请与我们联系 Powered by JdonFramework
返回顶部