Concurrent programming with Fork/Join framework in Java 7.0
fork/join框架主要是利用多核CPU并发,而之前介绍的Disruptor主要针对单核并发。一般单核情况下,因为运算逻辑的顺序性,无法挖掘CPU潜力,CPU利用率比较低,如文中单核图:
而使用了fork/join框架+多核CPU,CPU利用率就上来了:
其实这篇文章没有回答一个问题,如何在单核情况下提高CPU利用率,我认为使用Disruptor。
fork/join基于divide-and-conquer算法,适合将一个大任务划分为多个小任务,让多个CPU执行,然后再合并结果,如下图。如果你不要合并结果,可选方案就很多。
fork/join类似MapReduce算法,两者区别是:Fork/Join 只有在必要时如任务非常大的情况下才分割成一个个小任务,而 MapReduce总是在开始执行第一步进行分割。看来,Fork/Join更适合一个JVM内线程级别,而MapReduce适合分布式系统。
文章案例,需要切割的大任务:产生大量整数。
package mrbool.com; import java.util.Random; // This class defines a list which will contain large number of integers. public class LargeInteger { private final int[] list = new int[2000000]; public largeInteger() { Random generator = new Random(19580427); for (int i = 0; i < list.length; i++) { list[i] = generator.nextInt(500000); } } public int[] getList() { return list; } }
|
执行fork/join工作类,将原来需要2百万的整数任务分成两个一百万分别产生:
package mrbool.com; import java.util.Arrays; import jsr166y.forkjoin.RecursiveAction; public class SplitTask extends RecursiveAction { private int[] list; public long result; public SplitTask(int[] array) { this.list = array; } @Override protected void compute() { if (list.length == 1) { result = list[0]; } else { int midpoint = list.length / 2; int[] l1 = Arrays.copyOfRange(list, 0, midpoint); int[] l2 = Arrays.copyOfRange(list, midpoint, list.length); SplitTask s1 = new SplitTask(l1); SplitTask s2 = new SplitTask(l2); forkJoin(s1, s2);//关键 result = s1.result + s2.result;//合并结果 } } }
|
交由专门的ForkJoinExecutor 执行,ForkJoinExecutor 和普通Executor 框架,ForkJoinExecutor使用了work-stealing 算法,辅助线程可以从其他还在忙着的线程窃取任务。
import jsr166y.forkjoin.ForkJoinExecutor; import jsr166y.forkjoin.ForkJoinPool; import javablog.levent.com.SplitTask; public class TestForkJoin { public static void main(String[] args) { LargeInteger test = new LargeInteger(); // Check the number of available processors int nThreads = Runtime.getRuntime().availableProcessors(); System.out.println(nThreads); SplitTask mfj = new SplitTask(test.getList()); ForkJoinExecutor pool = new ForkJoinPool(nThreads); pool.invoke(mfj); long result = mfj.getResult(); System.out.println("Done. Result: " + result); } }
|
总结:Fork/Join框架是专门对付那些逻辑上强顺序性强一致性的运算场景,在一些无强一致性场合,我们可以通过简单的多线程(Disruptor)就可以解决,比如你去买A,看到A排队很长,那么你可以改变自己的主意,先去买B。而如果你不想改变主意,执意排队A,那么Fork/Join框架无疑是个利器,能够将A队强行分割成几个小分队,你变成“城管”了。