首先看一下ForkJoin相关的几个API

  • ForkJoinPool
    实现了forkjoin的线程池
  • ForkJoinWorkerThread
    forkjoin的线程
  • ForkJoinTask
    forkjoin任务的父类 这是一个抽象类
  • RecursiveAction
    无返回结果的任务接口
  • RecursiveTask
    有返回结果的任务接口

ForkJoinPool继承自AbstractExecutorService类 说明了 ForkJoinPool和ThreadPoolExecutor
差不多是同父的兄弟类 ,因为ThreadPoolExecutor也继承自AbstractExecutorService类。

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService

下面以一个简单的例子来说明一下使用方法

定义一个task实现类

public class Calculator extends RecursiveTask<Integer> {

private static final int THRESHOLD = 10;
private int start;
private int end;

public Calculator(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if((end - start) < THRESHOLD){
sum=calSingle();
}else{
int middle = (start + end) /2;
Calculator task1 = new Calculator(start, middle);
Calculator task2 = new Calculator(middle + 1, end);
//首先拆分任务
task2.fork();
int i=task2.join();
int i2=task1.invoke();
// invokeAll(task1,task2);
// int i=task1.getRawResult();
// int i2=task2.getRawResult();
//聚合
sum=i+i2;
}
return sum;
}
private int calSingle(){
int sum=0;
for(int i = start; i<= end;i++){
sum += i;
}
return sum;
}

}

测试main函数类

public class TestMain {
public static void main(String[] args) throws ExecutionException, InterruptedException {

Calculator calculator=new Calculator(0,500);

ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> f=pool.submit(calculator);

long start=System.currentTimeMillis();

System.out.println(System.currentTimeMillis()-start+"MS");
do{
if(calculator.isCompletedNormally()) {
System.out.println("计算完成 正在关闭Fork/Join池...");
pool.shutdown();
}
}while(!calculator.isDone());
System.out.println("计算结果为:"+f.get());
System.out.println("线程已经从另一个线程偷取到的时间数:"+pool.getStealCount());
System.out.println("是否已经完成执行:"+pool.isTerminated());
System.out.println("并行的级别:"+pool.getParallelism());
System.out.println("线程池的worker线程的数量:"+pool.getPoolSize());

System.out.println("执行的任务数:"+pool.getQueuedTaskCount());
}
}

需要注意的是Fork/Join的为了充分减少等待时间 默认使用的是LIFO策略,所以我们在执行第一个任务的
时候尽量不要fork。具体原因还不是很理解,因为在invokeAll方法中也是这样子处理的。invokeAll源码如下

public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}

首先Fork第二个任务。然后在执行第一个任务,其次是join第二个任务。