使用Java 7.0的 Fork/Join框架进行并发编程

13-05-02 banq
Concurrent programming with Fork/Join framework in Java 7.0

fork/join框架主要是利用多核CPU并发,而之前介绍的Disruptor主要针对单核并发。一般单核情况下,因为运算逻辑的顺序性,无法挖掘CPU潜力,CPU利用率比较低,如文中单核图:

而使用了fork/join框架+多核CPU,CPU利用率就上来了:

其实这篇文章没有回答一个问题,如何在单核情况下提高CPU利用率,我认为使用Disruptor。

fork/join基于divide-and-conquer算法,适合将一个大任务划分为多个小任务,让多个CPU执行,然后再合并结果,如下图。如果你不要合并结果,可选方案就很多。

fork/join类似MapReduce算法,两者区别是:Fork/Join 只有在必要时如任务非常大的情况下才分割成一个个小任务,而 MapReduce总是在开始执行第一步进行分割。看来,Fork/Join更适合一个JVM内线程级别,而MapReduce适合分布式系统。

文章案例,需要切割的大任务:产生大量整数。

package mrbool.com;
import java.util.Random;
// This class defines a list which will contain large number of integers.
public class LargeInteger {
    private final int[] list = new int[2000000];
    public largeInteger() {
        Random generator = new Random(19580427);
        for (int i = 0; i < list.length; i++) {
            list[i] = generator.nextInt(500000);
        }
    }
    public int[] getList() {
        return list;
    }
}

<p>

执行fork/join工作类,将原来需要2百万的整数任务分成两个一百万分别产生:

package mrbool.com;
import java.util.Arrays;
import jsr166y.forkjoin.RecursiveAction;
public class SplitTask extends RecursiveAction {
    private int[] list;
    public long result;
    public SplitTask(int[] array) {
        this.list = array;
    }
    @Override
    protected void compute() {
        if (list.length == 1) {
            result = list[0];
        } else {
            int midpoint = list.length / 2;
            int[] l1 = Arrays.copyOfRange(list, 0, midpoint);
            int[] l2 = Arrays.copyOfRange(list, midpoint, list.length);
            SplitTask s1 = new SplitTask(l1);
            SplitTask s2 = new SplitTask(l2);
            forkJoin(s1, s2);//关键
            result = s1.result + s2.result;//合并结果
        }
    }
}

<p>

交由专门的ForkJoinExecutor 执行,ForkJoinExecutor 和普通Executor 框架,ForkJoinExecutor使用了work-stealing 算法,辅助线程可以从其他还在忙着的线程窃取任务。

import jsr166y.forkjoin.ForkJoinExecutor;
import jsr166y.forkjoin.ForkJoinPool;
import javablog.levent.com.SplitTask;
public class TestForkJoin {
    public static void main(String[] args) {
        LargeInteger test = new LargeInteger();
        // Check the number of available processors
        int nThreads = Runtime.getRuntime().availableProcessors();
        System.out.println(nThreads);
        SplitTask mfj = new SplitTask(test.getList());
        ForkJoinExecutor pool = new ForkJoinPool(nThreads);
        pool.invoke(mfj);
        long result = mfj.getResult();
        System.out.println("Done. Result: " + result);     
    }
}

<p>

总结:Fork/Join框架是专门对付那些逻辑上强顺序性强一致性的运算场景,在一些无强一致性场合,我们可以通过简单的多线程(Disruptor)就可以解决,比如你去买A,看到A排队很长,那么你可以改变自己的主意,先去买B。而如果你不想改变主意,执意排队A,那么Fork/Join框架无疑是个利器,能够将A队强行分割成几个小分队,你变成“城管”了。

              

1
猜你喜欢