

  JDK 7已经发布,大多数人都听过ForkJoin的,但没有那么多时间或机会在日常工作中尝试一下。对比通常的线程池有什么不同?


public class TaskProcessorPool extends AbstractTaskProcessor {

* 4 threads for a 4 core / 4 threads system 四个线程对应四核CPU
    public final int POOL_SIZE = 4;
* Thread pool
    private ExecutorService threadPool;
    public TaskProcessorPool(TaskSolver taskSolver) {
        threadPool = Executors.newFixedThreadPool(POOL_SIZE);

    public void process(Task task) {
        // As soon as we have a task we submit it to the threadpool
        threadPool.submit(new TaskRunnable(task));
    public void shutdown() {
* Runnable solver for a task
    class TaskRunnable implements Runnable {

* Task to be solved
        private Task task;
        public TaskRunnable(Task task) {
            this.task = task;
        public void run() {






Fork Join

  Forkjoin 只有你在将一个任务拆分成小任务时才有用处。fork-join池是是一个work-stealing工作窃取线程池。每个工作线程维护本地任务dequeue。



   这里是Doug Lea原始论文的更详细的解释: http://gee.cs.oswego.edu/dl/papers/fj.pdf


* This is a Fork / Join processor for tasks, using a ForkJoinPool
* At the processing step it will split the task into subtasks until it reaches a threshold
* Once the threshold is reached it will solve the subtask
* @author florin.bunau
public class TaskProcessorFJ extends AbstractTaskProcessor {
* Work stealing thread pool implementation
    private ForkJoinPool forkJoinPool;
    public TaskProcessorFJ(TaskSolver taskSolver) {
        forkJoinPool = new ForkJoinPool();

    public void process(Task task) {
        // Give the pool a subtask to solve. At the beginning task == subtask
        forkJoinPool.invoke(new Subtask(task, 0, task.getOperations().size(), true));

* Wraps a task to be solved, and restricts it's extend to a subtask.
* Also implements a FJ recursive action, that will split the subtask into two smaller substasks
* if it's above a specified threshold
    private class Subtask extends RecursiveAction {

        private static final long serialVersionUID = 1L;
* Task to be solved
        final Task task;
* Limit the task from operation having index 'from'
        final int from;
* to operation having index 'to'
        final int to;
* Is this subtask == the initial task
        final boolean rootTask;

        public Subtask(Task task, int from, int to, boolean rootTask) {
            this.task = task;
            this.from = from;
            this.to = to;
            this.rootTask = rootTask;

        protected void compute() {
            // If the subtask size is smaller than an L task. ( XS, S, M ), then go ahead and solve it
            if (to - from < Task.TaskType.L.getRange()) {
                taskSolver.solve(task, from, to);
            // If it's same or larger ( L, XL, XXL ), we will break it into two smaller substasks and solve it later
            else {
                int mid = (from + to) / 2;
                invokeAll(new Subtask(this.task, from, mid, false),
                          new Subtask(this.task, mid + 1, to, false));
            // We're done solving the subtask, if it's the initial task, then signal that we are done solving the task
            if (rootTask) {


   例如: (切分一个任务导致线程获得上下文切换机会要超过实际执行它)对于一个大XXL 任务,我们要做1000000查询操作。我们可以拆分成2 500000操作任务,并做平行。 500000仍然较大?是的,我们能再切分。我选择了一组10000操作作为门槛,再往下分裂没有用了,我们可以在当前线程上直接执行它们。Fork Join并不做切分这些前期工作。


TaskProcessorPool: 3933
TaskProcessorPool: 2906
TaskProcessorPool: 4477
TaskProcessorPool: 4160

TaskProcessorFJ: 2498
TaskProcessorFJ: 2498
TaskProcessorFJ: 2524
TaskProcessorFJ: 2511


