使用Java 7.0的 Fork/Join框架进行并发编程

Concurrent programming with Fork/Join framework in Java 7.0

fork/join框架主要是利用多核CPU并发,而之前介绍的Disruptor主要针对单核并发。一般单核情况下,因为运算逻辑的顺序性,无法挖掘CPU潜力,CPU利用率比较低,如文中单核图:

而使用了fork/join框架+多核CPU,CPU利用率就上来了:

其实这篇文章没有回答一个问题,如何在单核情况下提高CPU利用率,我认为使用Disruptor。

fork/join基于divide-and-conquer算法,适合将一个大任务划分为多个小任务,让多个CPU执行,然后再合并结果,如下图。如果你不要合并结果,可选方案就很多。

fork/join类似MapReduce算法,两者区别是:Fork/Join 只有在必要时如任务非常大的情况下才分割成一个个小任务,而 MapReduce总是在开始执行第一步进行分割。看来,Fork/Join更适合一个JVM内线程级别,而MapReduce适合分布式系统。

文章案例,需要切割的大任务:产生大量整数。


package mrbool.com;
import java.util.Random;
// This class defines a list which will contain large number of integers.
public class LargeInteger {
private final int[] list = new int[2000000];
public largeInteger() {
Random generator = new Random(19580427);
for (int i = 0; i < list.length; i++) {
list[i] = generator.nextInt(500000);
}
}
public int[] getList() {
return list;
}
}

执行fork/join工作类,将原来需要2百万的整数任务分成两个一百万分别产生:


package mrbool.com;
import java.util.Arrays;
import jsr166y.forkjoin.RecursiveAction;
public class SplitTask extends RecursiveAction {
private int[] list;
public long result;
public SplitTask(int[] array) {
this.list = array;
}
@Override
protected void compute() {
if (list.length == 1) {
result = list[0];
} else {
int midpoint = list.length / 2;
int[] l1 = Arrays.copyOfRange(list, 0, midpoint);
int[] l2 = Arrays.copyOfRange(list, midpoint, list.length);
SplitTask s1 = new SplitTask(l1);
SplitTask s2 = new SplitTask(l2);
forkJoin(s1, s2);//关键
result = s1.result + s2.result;
//合并结果
}
}
}

交由专门的ForkJoinExecutor 执行,ForkJoinExecutor 和普通Executor 框架,ForkJoinExecutor使用了work-stealing 算法,辅助线程可以从其他还在忙着的线程窃取任务。


import jsr166y.forkjoin.ForkJoinExecutor;
import jsr166y.forkjoin.ForkJoinPool;
import javablog.levent.com.SplitTask;
public class TestForkJoin {
public static void main(String[] args) {
LargeInteger test = new LargeInteger();
// Check the number of available processors
int nThreads = Runtime.getRuntime().availableProcessors();
System.out.println(nThreads);
SplitTask mfj = new SplitTask(test.getList());
ForkJoinExecutor pool = new ForkJoinPool(nThreads);
pool.invoke(mfj);
long result = mfj.getResult();
System.out.println(
"Done. Result: " + result);
}
}

总结:Fork/Join框架是专门对付那些逻辑上强顺序性强一致性的运算场景,在一些无强一致性场合,我们可以通过简单的多线程(Disruptor)就可以解决,比如你去买A,看到A排队很长,那么你可以改变自己的主意,先去买B。而如果你不想改变主意,执意排队A,那么Fork/Join框架无疑是个利器,能够将A队强行分割成几个小分队,你变成“城管”了。