仔細看了Doug Lea的那篇文章:A Java Fork/Join Framework 中關于Work-Stealing的部分,下面列出該算法的要點(基本是原文的翻譯):
1. 每個Worker線程都維護一個任務隊列,即ForkJoinWorkerThread中的任務隊列。
2. 任務隊列是雙向隊列,這樣可以同時實現LIFO和FIFO。
3. 子任務會被加入到原先任務所在Worker線程的任務隊列。
4. Worker線程用LIFO的方法取出任務,也就后進隊列的任務先取出來(子任務總是后加入隊列,但是需要先執行)。
5. Worker線程的任務隊列為空,會隨機從其他的線程的任務隊列中拿走一個任務執行(所謂偷任務:steal work,FIFO的方式)。
6. 如果一個Worker線程遇到了join操作,而這時候正在處理其他任務,會等到這個任務結束。否則直接返回。
7. 如果一個Worker線程偷任務失敗,它會用yield或者sleep之類的方法休息一會兒,再嘗試偷任務(如果所有線程都是空閑狀態,即沒有任務運行,那么該線程也會進入阻塞狀態等待新任務的到來)。
那么重新回到ForkJoinPool的scan方法
PRivate boolean scan(ForkJoinWorkerThread w, int a) { // scanGuard是32位的整數,用于worker線程數組的索引 // 第16位稱為SG_UNIT,為1表示鎖住 // 0到15位是mask int g = scanGuard; // parallelism表示并發數,一般指CPU可以同時運行的線程數 // 默認值是Runtime類的availableProcessors方法返回值,表示 // 處理器的數量 // a是活躍的Worker線程的數量,parallelism是大于0的,因此 // 條件parallelism == 1 - a滿足意味著parallelism為1而a為0 // 而加上blockedCount為0(意味著沒有線程因為join被阻塞), // 兩個條件同時滿足也就意味既沒有任何線程在運行,那么也就 // 意味著沒有任務存在于worker線程,所以m=0也就是沒法偷任務 // SMASK=0xffff,g & SMASK返回的值scanGuard的0到15位的數值 int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws == null || ws.length <= m) return false; // for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; // 從線程隊列中隨機獲取一個worker線程 ForkJoinWorkerThread v = ws[k & m]; // 判斷Worker線程是否存在以及該線程的任務隊列是否有任務 if (v != null && (b = v.queueBase) != v.queueTop && (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { // 從隊列中偷走一個任務 long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && v.queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { int d = (v.queueBase = b + 1) - v.queueTop; v.stealHint = w.poolIndex; // d是偷走一個任務后任務隊列的長度 if (d != 0) signalWork(); w.execTask(t); } r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); // false表示掃描到了任務 return false; } else if (j < 0) { // 異或移位,更新k r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; } else ++k; } // 如果掃描不到任務,但是scanGuard被更新了,說明有任務的變化 if (scanGuard != g) return false; else { // 從線程池的任務隊列中取出任務來執行 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; if ((b = queueBase) != queueTop && (q = submissionQueue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { queueBase = b + 1; w.execTask(t); } return false; } return true; } }
scan方法的作用就是從其他線程的任務隊列中偷任務。
新聞熱點
疑難解答