首先看一下ForkJoin相关的几个API
- ForkJoinPool
实现了forkjoin的线程池
- ForkJoinWorkerThread
forkjoin的线程
- ForkJoinTask
forkjoin任务的父类 这是一个抽象类
- RecursiveAction
无返回结果的任务接口
- RecursiveTask
有返回结果的任务接口
ForkJoinPool继承自AbstractExecutorService类 说明了 ForkJoinPool和ThreadPoolExecutor
差不多是同父的兄弟类 ,因为ThreadPoolExecutor也继承自AbstractExecutorService类。
@sun.misc.Contended public class ForkJoinPool extends AbstractExecutorService
|
下面以一个简单的例子来说明一下使用方法
定义一个task实现类
public class Calculator extends RecursiveTask<Integer> { private static final int THRESHOLD = 10; private int start; private int end; public Calculator(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if((end - start) < THRESHOLD){ sum=calSingle(); }else{ int middle = (start + end) /2; Calculator task1 = new Calculator(start, middle); Calculator task2 = new Calculator(middle + 1, end); task2.fork(); int i=task2.join(); int i2=task1.invoke();
sum=i+i2; } return sum; } private int calSingle(){ int sum=0; for(int i = start; i<= end;i++){ sum += i; } return sum; } }
|
测试main函数类
public class TestMain { public static void main(String[] args) throws ExecutionException, InterruptedException {
Calculator calculator=new Calculator(0,500);
ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> f=pool.submit(calculator);
long start=System.currentTimeMillis();
System.out.println(System.currentTimeMillis()-start+"MS"); do{ if(calculator.isCompletedNormally()) { System.out.println("计算完成 正在关闭Fork/Join池..."); pool.shutdown(); } }while(!calculator.isDone()); System.out.println("计算结果为:"+f.get()); System.out.println("线程已经从另一个线程偷取到的时间数:"+pool.getStealCount()); System.out.println("是否已经完成执行:"+pool.isTerminated()); System.out.println("并行的级别:"+pool.getParallelism()); System.out.println("线程池的worker线程的数量:"+pool.getPoolSize());
System.out.println("执行的任务数:"+pool.getQueuedTaskCount()); } }
|
需要注意的是Fork/Join的为了充分减少等待时间 默认使用的是LIFO策略,所以我们在执行第一个任务的
时候尽量不要fork。具体原因还不是很理解,因为在invokeAll方法中也是这样子处理的。invokeAll源码如下
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { int s1, s2; t2.fork(); if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) t1.reportException(s1); if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) t2.reportException(s2); }
|
首先Fork第二个任务。然后在执行第一个任务,其次是join第二个任务。