public class CountTask extends RecursiveTask<Integer> {PRivate static final long serialVersionUID = -3454816350595604316L;private static final int threshold = 2; // 閾值private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}public static void main(String[] args) throws InterruptedException, ExecutionException {ForkJoinPool joinPool = new ForkJoinPool();//生成一個計算任務CountTask countTask = new CountTask(2, 266666);//執(zhí)行任務Future< Integer> future = joinPool.submit(countTask);System.out.println(future.get()); int cc = 2+3+4+5+6+7+8+9+10+11; System.out.println(cc);}@Overrideprotected Integer compute() {int sum = 0;// 如果任務足夠小就計算任務boolean cancompute = (end - start) <= threshold;if (cancompute) {for (int i = start; i <= end; i++) {sum = sum + i;}} else {// 如果任務大于閾值 ,就會分解成兩個小任務計算int middle = (start + end) / 2;CountTask left = new CountTask(start, middle);CountTask right = new CountTask(middle + 1, end);// 執(zhí)行子任務left.fork();right.fork();// 等待子任務執(zhí)行結束,并得到子任務結果int leftresult = left.join();int rightresult = right.join();sum = leftresult + rightresult;}return sum;}}
Fork/Join框架主要有以下兩個類組成. * ForkJoinPool 這個類實現(xiàn)了ExecutorService接口和工作竊取算法(Work-Stealing Algorithm).它管理工作者線程,并提供任務的狀態(tài)信息,以及任務的執(zhí)行信息 * ForkJoinTask 這個類是一個將在ForkJoinPool執(zhí)行的任務的基類.
Fork/Join框架提供了在一個任務里執(zhí)行fork()和join()操作的機制和控制任務狀態(tài)的方法.通常,為了實現(xiàn)Fork/Join任務,需要實現(xiàn)一個以下兩個類之一的子類 * RecursiveAction 用于任務沒有返回值的場景 * RecursiveTask 用于任務有返回值的場景.
fork-join框架
fork操作的作用是把一個大的問題劃分成若干個較小的問題。在這個劃分過程一般是遞歸進行的。直到可以直接進行計算。需要恰當?shù)剡x取子問題的大小。太大的子問題不利于通過并行方式來提高性能,而太小的子問題則會帶來較大的額外開銷。每個子問題計算完成后,可以得到關于整個問題的部分解。join操作的作用是把這些分解手機組織起來,得到完整解。
簡單的說,F(xiàn)orkJoin其核心思想就是分治。Fork分解任務,Join收集數(shù)據(jù)。

在fork/join框架中,若某個子問題由于等待另一個子問題的完成而無法繼續(xù)執(zhí)行。那么處理該子問題的線程會主動尋找其他尚未運行完成的子問題來執(zhí)行。這種方式減少了線程的等待時間,提高了性能。子問題中應該避免使用synchronized關鍵詞或其他方式方式的同步。也不應該是一阻塞IO或過多的訪問共享變量。在理想情況下,每個子問題的實現(xiàn)中都應該只進行CPU相關的計算,并且只適用每個問題的內部對象。唯一的同步應該只發(fā)生在子問題和創(chuàng)建它的父問題之間。

Fork/Join框架的主要類
一個fork/join框架之下的任務由ForkJoinTask類表示。ForkJoinTask實現(xiàn)了Future接口,可以按照Future接口的方式來使用。在ForkJoinTask類中之重要的兩個方法fork和join。fork方法用以一部方式啟動任務的執(zhí)行,join方法則等待任務完成并返回指向結果。在創(chuàng)建自己的任務是,最好不要直接繼承自ForkJoinTask類,而要繼承自ForkJoinTask類的子類RecurisiveTask或RecurisiveAction類。兩種的區(qū)別在于RecurisiveTask類表示的任務可以返回結果,而RecurisiveAction類不行。
簡單總結:
ForkJoin主要提供了兩個主要的執(zhí)行任務的接口。RecurisiveAction與RecurisiveTask 。
RecurisiveAction :沒有返回值的接口。RecurisiveTask :帶有返回值的接口。
fork/join框架任務的執(zhí)行由ForkJoinTask類的對象之外,還可以使用一般的Callable和Runnable接口來表示任務。
ForkJoin要利用線程池ForkJoinPool。每個線程池都有一個WorkQueue實例。ForkJoinPool推薦查看JDK8的源碼,比JDK7更利于理解。
在ForkJoinPool類的對象中執(zhí)行的任務大支可以分為兩類,一類通過execute、invoke或submit提交的任務;另一類是ForkJoinTask類的對象在執(zhí)行過程中產(chǎn)生的子任務,并通過fork方法來運行。一般的做法是表示整個問題的ForkJoinTask類的對象用第一類型是提交,而在執(zhí)行過程中產(chǎn)生的子任務并不需要進行處理,F(xiàn)orkJoinPool類對象會負責子任務的執(zhí)行。

ForkJoinPool是ExecutorService的實現(xiàn)類,因此是一種特殊的線程池。使用方法與Executor框架類似。ForkJoinPool提供如下兩個常用的構造器:
ForkJoinPool(int parallelism) 創(chuàng)建一個包含parallelism個并行線程的ForkJoinPool。
ForkJoinPool() 以Runtime.availableProcessors()方法的返回值作為parallelism參數(shù)來創(chuàng)建ForkJoinPool。
ForkJoinPool有如下三個方法啟動線程:
使用ForkJoinPool的submit(ForkJoinTask task) 或 invoke(ForkJoinTask task) 方法來執(zhí)行指定任務。其中ForkJoinTask代表一個可以并行、合并的任務。
| | 客戶端非fork/join調用 | 內部調用fork/join |
| 異步執(zhí)行 | execute(ForkJoinTask) | ForkJoinTask.fork |
| 等待獲取結果 | invoke(ForkJoinTask) | ForkJoinTask.invoke |
| 執(zhí)行,獲取Future | submit(ForkJoinTask) | ForkJoinTask.fork(ForkJoinTask are Futures) |
ForkJoinTask是分支合并的執(zhí)行任何,分支合并的業(yè)務邏輯使用者可以再繼承了這個抽先類之后,在抽象方法exec()中實現(xiàn)。其中exec()的返回結果和ForkJoinPool的執(zhí)行調用方(execute(...),invoke(...),submit(...)),共同決定著線程是否阻塞,具體請看下面的測試用例。
ForkJoinTask 是一個抽象類,它還有兩個抽象子類:RecurisiveTask和RecurisiveAction。
RecurisiveTask代表有返回值的任務。RecursiveTask<T>是泛型類。T是返回值的類型。
RecurisiveAction代表沒有返回值的任務。
異常處理
ForkJoinTask在執(zhí)行的時候可能會拋出異常,但是沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經(jīng)拋出異常或已經(jīng)被取消了,并且可以通過ForkJoinTask的getException方法獲取異常。使用如下代碼:
if(task.isCompletedAbnormally()) { System.out.println(task.getException());}getException方法返回Throwable對象,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有拋出異常則返回null。
工作竊取原理

例子 先定個小目標,1億就太多,先賺個一百萬吧
現(xiàn)在你是一個深圳片區(qū)的某公司高級銷售主管.現(xiàn)在定了一個目標,就是要賺個一百,讓你一個人去賺,肯定有難度的.好在有一般手下,把目標縮小,讓小弟們去賺,我們坐等拿錢.ok,開始編程
首先我們要定義個賺錢任務 MakeMoneyTask,如果要賺錢的目標小于最小目標,比如十萬,那么就自己去完成,否則,就把任務分給小弟們去做.public class MakeMoneyTask extends RecursiveTask<Integer>{ private static final int MIN_GOAL_MONEY = 100000; private int goalMoney; private String name; private static final AtomicLong employeeNo = new AtomicLong(); public MakeMoneyTask(int goalMoney){ this.goalMoney = goalMoney; this.name = "員工" + employeeNo.getAndIncrement() + "號"; } @Override protected Integer compute() { if (this.goalMoney < MIN_GOAL_MONEY){ System.out.println(name + ": 老板交代了,要賺 " + goalMoney + " 元,為了買車買房,加油吧...."); return makeMoney(); }else{ int subThreadCount = ThreadLocalRandom.current().nextInt(10) + 2; System.out.println(name + ": 上級要我賺 " + goalMoney + ", 有點小多,沒事讓我" + subThreadCount + "個手下去完成吧," + "每人賺個 " + Math.ceil(goalMoney * 1.0 / subThreadCount) + "元應該沒問題..."); List<MakeMoneyTask> tasks = new ArrayList<>(); for (int i = 0; i < subThreadCount; i ++){ tasks.add(new MakeMoneyTask(goalMoney / subThreadCount)); } Collection<MakeMoneyTask> makeMoneyTasks = invokeAll(tasks); int sum = 0; for (MakeMoneyTask moneyTask : makeMoneyTasks){ try { sum += moneyTask.get(); } catch (Exception e) { e.printStackTrace(); } } System.out.println(name + ": 嗯,不錯,效率還可以,終于賺到 " + sum + "元,趕緊邀功去...."); return sum; } } private Integer makeMoney(){ int sum = 0; int day = 1; try { while (true){ Thread.sleep(ThreadLocalRandom.current().nextInt(500)); int money = ThreadLocalRandom.current().nextInt(MIN_GOAL_MONEY / 3); System.out.println(name + ": 在第 " + (day ++) + " 天賺了" + money); sum += money; if (sum >= goalMoney){ System.out.println(name + ": 終于賺到 " + sum + " 元, 可以交差了..."); break; } } } catch (InterruptedException e) { e.printStackTrace(); } return sum; }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657最后我們寫一個測試類public class TestMain { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> task = pool.submit(new MakeMoneyTask(1000000)); do { try { TimeUnit.MILLISECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); } }while (!task.isDone()); pool.shutdown(); System.out.println(task.get()); }}