接下來看看調用ForkJoinTask的join方法都發生了什么:
public final V join() { // doJoin方法返回該任務的狀態,狀態值有三種: // NORMAL, CANCELLED和EXCEPTIONAL // join的等待過程在doJoin方法中進行 if (doJoin() != NORMAL) // reportResult方法針對任務的三種狀態有三種處理方式: // NORMAL: 直接返回getRawResult()方法的返回值 // CANCELLED: 拋出CancellationException // EXCEPTIONAL: 如果任務執行過程拋出了異常,則拋出該異常,否則返回getRawResult() return reportResult(); else // getRawResult是抽象方法,由子類來實現 return getRawResult(); }RecursiveAction和RecursiveTask實現了getRawResult方法。
RecursiveAction用于沒有返回值的場合,因此getRawResult方法返回null。
RecursiveTask用于有返回值的場合,因此返回的是抽象方法compute方法的返回值。
接下來繼續看join的核心方法doJoin方法:
PRivate int doJoin() { Thread t; ForkJoinWorkerThread w; int s; boolean completed; // 針對ForkJoinWorkerThread調用join的情況 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { // status值的初始化值是0,在任務沒有完成以前一直是非負值 // 因此一旦status的值變成負數,表示任務已經完成,直接返回 if ((s = status) < 0) return s; // 檢查當前worker線程的任務棧(因為采用LIFO方式,所有這里稱為棧) // 的棧頂的任務是不是當前任務,如果是,從棧中取走該任務并執行 // 然后返回執行之后任務的狀態 if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) return setCompletion(NORMAL); } // 如果不是棧頂任務的情況 return w.joinTask(this); } else // 外部線程等待任務結束的情況 return externalAwaitDone(); }
前面文章中曾經舉了幾個例子來演示如何實現RecursiveTask的子類。在compute方法中會看到了join方法的調用,也就是ForkJoinWorkerThread調用join的情況。
因此首先來看ForkJoinWorkerThread的joinTask方法的實現:
final int joinTask(ForkJoinTask<?> joinMe) { ForkJoinTask<?> prevJoin = currentJoin; currentJoin = joinMe; for (int s, retries = MAX_HELP;;) { // 當前任務已經完成,返回到前面一個join的任務 if ((s = joinMe.status) < 0) { currentJoin = prevJoin; return s; } // 剩余的嘗試次數大于0(MAX_HELP值為16)的情況,繼續做嘗試 if (retries > 0) { if (queueTop != queueBase) { // 檢查當前線程的任務棧,如果任務棧不為空,當前任務處在棧頂位置則 // 執行該任務返回true,否則返回false,直接認為嘗試失敗 if (!localHelpJoinTask(joinMe)) retries = 0; } // 嘗試了最大允許次數的一半 else if (retries == MAX_HELP >>> 1) { --retries; // 檢查當前任務是否在某個worker線程的任務隊列的隊首位置 // 如果是的話,偷走這個任務并且執行掉該任務。tryDeqAndExec // 返回任務的status值,因此大于等于0意味著任務還沒有執行結束, // 當前線程讓出控制權以便其他線程執行任務 if (tryDeqAndExec(joinMe) >= 0) Thread.yield(); } else // helpJoinTask方法檢查當前任務是不是被某個Worker線程偷走了, // 并且是這個線程最新偷走的任務(currentSteal),如果是的話, // 當前線程幫助執行這個任務,這個過程成功則返回true retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1; } else { // 嘗試了最大允許次數還沒有成功,重置以便再次嘗試 retries = MAX_HELP; // 一輪嘗試失敗,進入進程池等待任務 pool.tryAwaitJoin(joinMe); } } }來看一輪嘗試失敗之后,調用線程池的tryAwaitJoin方法會發生一些什么:
final void tryAwaitJoin(ForkJoinTask<?> joinMe) { int s; // 檢查任務是否結束之前先清除當前線程的中斷狀態 // 因為tryAwaitDone會調用wait可能產生中斷異常 Thread.interrupted(); // 任務還在執行的情況,否則執行完成就直接返回 if (joinMe.status >= 0) { // blockedCount加1,把當前線程標記為阻塞 // 成功則返回true,否則返回false if (tryPreBlock()) { // 調用wait方法等待任務完成 joinMe.tryAwaitDone(0L); // blockedCount減1,把當前線程標記為活躍狀態 postBlock(); } // 線程處于關閉狀態的情況,取消該任務 else if ((ctl & STOP_BIT) != 0L) joinMe.cancelIgnoringExceptions(); } }最后又回歸到了原點,來看task的tryAwaitDone方法:
final void tryAwaitDone(long millis) { int s; try { // status為0,設為1。成功了然后才會用wait等待 if (((s = status) > 0 || (s == 0 && UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) && status > 0) { synchronized (this) { if (status > 0) wait(millis); } } } catch (InterruptedException ie) { // 因為wait被中斷了,不能保證任務被正確執行結束,因此調用該方法時要注意 // 檢查任務是否已經執行結束了 }走完了Worker線程內的join的流程,最后來看其他線程join等待發生了什么,來看externalAwaitDone方法:
private int externalAwaitDone() { int s; if ((s = status) >= 0) { boolean interrupted = false; synchronized (this) { // 循環等待直到任務執行結束 while ((s = status) >= 0) { if (s == 0) UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL); else { try { wait(); } catch (InterruptedException ie) { interrupted = true; } } } } // 清除中斷狀態 if (interrupted) Thread.currentThread().interrupt(); } return s; }externalAwaitDone邏輯較為簡單,采用循環的方式,使用wait方法等待直到任務執行結束。
既然使用wait方法等待,那么必然在任務執行結束后需要調用notify或者notifyAll的方法,在setCompletion方法找到了:
private int setCompletion(int completion) { for (int s;;) { if ((s = status) < 0) return s; if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) { if (s != 0) synchronized (this) { notifyAll(); } return completion; } } }到這里把Fork/Join框架簡單地講完了,因為水平所限,遺漏了很多的細節,各位見諒。
新聞熱點
疑難解答