Java8的completablefuture和parallel stream比较
本文介绍ava 8的 与并行流parallel stream在异步计算方面的比较。
我们以下面需呀长时间运行的任务模型为案例:
class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() { System.out.println(Thread.currentThread().getName()); try { Thread.sleep(duration * 1000); } catch (final InterruptedException e) { throw new RuntimeException(e); }
return duration; } }
让我们创建十个任务,间隔一秒执行
List<MyTask> tasks = IntStream.range(0, 10) .mapToObj(i -> new MyTask(1)) .collect(toList());
如何实现这个任务更有效率呢?
首先是顺序执行,如下代码:
public static void runSequentially(List<MyTask> tasks) {
long start = System.nanoTime(); List<Integer> result = tasks.stream() .map(MyTask::calculate) .collect(toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); System.out.println(result);
}
这个任务在main主线程中一个接着一个执行,花费了10秒。
下面我们看看使用parallel stream:
public static void useParallelStream(List<MyTask> tasks) {
long start = System.nanoTime();
List<Integer> result = tasks.parallelStream()
.map(MyTask::calculate)
.collect(toList());
long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); System.out.println(result);
}
输出结果是:
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main
Processed 10 tasks in 3043 millis
这个方式花费了3秒,因为有4个任务并行运行,使用了ForkJoin的3个线程。
再看看CompletablFutrue方式,代码如下:
public static void useCompletableFuture(List<MyTask> tasks) { long start = System.nanoTime(); List<CompletableFuture<Integer>> futures = tasks.stream() .map(t -> CompletableFuture.supplyAsync(() -> t.calculate())) .collect(Collectors.toList());
List<Integer> result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); System.out.println(result);
}
在这个代码中获得了CompletableFuture的List,然后调用此future上Join方法等待它们一个个完成,注意join等同于get,唯一区别是前者不会扔出任何检查意外exception,更加方便于lambda表达式。
这里使用了两个分类的流管道,而不是每个放入各自的map操作,因为内部流操作是赖计算,这就能在等待他们计算完成前收集CompletableFuture到一个list中然后才会开始计算。
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 4010 millis
花了4秒处理了十个任务,只有3个FirkJoinPool使用,不像parallel stream,主线程并没有使用,因此不会堵塞主线程。
下面我们再看看使用订制的执行器结合CompletableFuture:
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime(); ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10)); List<CompletableFuture<Integer>> futures = tasks.stream() .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor)) .collect(Collectors.toList());
List<Integer> result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}
你可以定制一个自己的Executor来进行计算,根据你的应用特点,这个应用不是CPU密集型,因此可以增加线程数量,超过Runtime.getRuntime().getAvailableProcessors()。
输出结果:
pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1009 millis
花了一秒执行了10个任务,该方式最快。 如果你是I/O密集型的计算,可以增加线程池大小,如果是CPU密集型计算,就不能增加比你的处理器个数再多的线程了,这是使用parallel stream比较好一些。