Java企业教程系列
Java8流和forkjoin并行计算
Java 8 API一个大特点是它的流Stream API,它消除了代码中几乎所有的循环,让你能够编写更富有表现力和更加精准的代码。
让我们假定并行运行很多任务,每个任务都要打印出所在线程的名字,在它们完成后我们还可以继续执行它们。使用ExecutorService的代码如下:
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executor.submit(() -> System.out.println(Thread.currentThread()));
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
// TODO handle...
}
这里有很多代码,我们可以做得更好,可以使用Stream API来实现:
void doInParallelNTimes(int times, Runnable op) {
IntStream.range(0, times).parallel().forEach(i -> op.run());
}
调用这个方法的代码如下:
doInParallelNTimes(5, () -> System.out.println(Thread.currentThread()));
执行结果输出:
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
以上是使用Stream小试牛刀,下面我们看看一个并行计算的案例。
这次我们不再做同样事情N多次了,而是使用Stream API并行产生许多不同的任务,我们能创建一个带有值集合的流,基于它们有一个并行执行的函数,最后聚合结果(将数据收集到集合,然后reduce汇聚到一个值 map/reduce?),这次任务是计算首个45 斐波那契数字的总和:
public class Tester {
public static void main(String[] args) {
Stopwatch stopwatch = Stopwatch.createStarted();
IntStream.range(1, 45).parallel().map(Tester::fib).sum();
System.out.println("Parallel took " + stopwatch.elapsed(MILLISECONDS) + " ms");
stopwatch.reset();
stopwatch.start();
IntStream.range(1, 45).map(Tester::fib).sum();
System.out.println("Sequential took " + stopwatch.elapsed(MILLISECONDS) + " ms");
}
private static int fib(int n) {
if (n == 1 || n == 2) {
return 1;
} else {
return fib(n - 1) + fib(n - 2);
}
}
}
首先创建一个与我们所要运行的并行任务描述流。然后并行调用所有的函数。最后返回的所有结果的总和。可以抽象为:创造一个任意值的流(包括丰富的Java对象)并对其执行一个普通的操作。
输出结果:
Parallel took 3078 ms
Sequential took 7327 ms