Java企业教程系列

Java8流和forkjoin并行计算

  Java 8 API一个大特点是它的流Stream API,它消除了代码中几乎所有的循环,让你能够编写更富有表现力和更加精准的代码。

  让我们假定并行运行很多任务,每个任务都要打印出所在线程的名字,在它们完成后我们还可以继续执行它们。使用ExecutorService的代码如下:

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 5; i++) {

    executor.submit(() -> System.out.println(Thread.currentThread()));

}

executor.shutdown();

try {

    executor.awaitTermination(1, TimeUnit.SECONDS);

} catch (InterruptedException ex) {

    // TODO handle...

}

  这里有很多代码,我们可以做得更好,可以使用Stream API来实现:

void doInParallelNTimes(int times, Runnable op) {

    IntStream.range(0, times).parallel().forEach(i -> op.run());

}

调用这个方法的代码如下:

doInParallelNTimes(5, () -> System.out.println(Thread.currentThread()));

 

执行结果输出:

Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]

 

  以上是使用Stream小试牛刀,下面我们看看一个并行计算的案例。

  这次我们不再做同样事情N多次了,而是使用Stream API并行产生许多不同的任务,我们能创建一个带有值集合的流,基于它们有一个并行执行的函数,最后聚合结果(将数据收集到集合,然后reduce汇聚到一个值 map/reduce?),这次任务是计算首个45 斐波那契数字的总和:

public class Tester {

    public static void main(String[] args) {

        Stopwatch stopwatch = Stopwatch.createStarted();

        IntStream.range(1, 45).parallel().map(Tester::fib).sum();

        System.out.println("Parallel took " + stopwatch.elapsed(MILLISECONDS) + " ms");

 

        stopwatch.reset();

        stopwatch.start();

        IntStream.range(1, 45).map(Tester::fib).sum();

        System.out.println("Sequential took " + stopwatch.elapsed(MILLISECONDS) + " ms");

    }

 

    private static int fib(int n) {

        if (n == 1 || n == 2) {

            return 1;

        } else {

            return fib(n - 1) + fib(n - 2);

        }

    }

}

 

首先创建一个与我们所要运行的并行任务描述流。然后并行调用所有的函数。最后返回的所有结果的总和。可以抽象为:创造一个任意值的流(包括丰富的Java对象)并对其执行一个普通的操作。

输出结果:

Parallel took 3078 ms
Sequential took 7327 ms

 

数据表每天五千四百万数据,,如何汇总