在寫前面兩篇文章23和24的時(shí)候自己有很多細(xì)節(jié)搞得不是很明白,這篇文章把Fork和Work-Stealing相關(guān)的源代碼重新梳理一下。
首先來看一些線程池定義的成員變量:
關(guān)于scanGuard:
volatile int scanGuard;PRivate static final int SG_UNIT = 1 << 16;private static final int SMASK = 0xffff;
scanGuard低位16位數(shù)值(0到15位)始終等于2的N次方減去1,代表的是大于Worker線程數(shù)的最小的2的N次方減去1。因此每次要取低16位數(shù)據(jù)時(shí)都要用到SMASK。
scanGuard的第16位是一個(gè)標(biāo)志位,被當(dāng)成是一個(gè)更新worker線程數(shù)組的鎖使用。當(dāng)該位的數(shù)據(jù)是1時(shí),表示worker線程數(shù)組被鎖住,其他線程無法更新worker線程。
要更新第16位的數(shù)值,就需要用到SG_UNIT。
再來說說與任務(wù)隊(duì)列有關(guān)的三個(gè)變量:
// 存儲(chǔ)任務(wù)的數(shù)組,長(zhǎng)度是2的N次方ForkJoinTask<?>[] queue;// 最后一個(gè)元素?cái)?shù)組下標(biāo)+1// 如果把數(shù)組看成是隊(duì)列,那么該位置就是隊(duì)列尾部(FIFO添加元素)// 如果看成是棧,那么該位置就棧頂(LIFO拿走元素)// 只能當(dāng)前線程會(huì)使用這個(gè)數(shù)值,不存在多線程問題,因此不用volatileint queueTop;// 第一個(gè)元素的數(shù)組下標(biāo)// 也就是隊(duì)列的頭部的位置,從隊(duì)列中拿走元素時(shí),該數(shù)值加1// 其他線程偷任務(wù)(FIFO方式)時(shí)會(huì)更新這個(gè)變量,因此需要volatilevolatile int queueBase;
任務(wù)隊(duì)列的設(shè)計(jì)和Work-Stealing要求的一致(支持LIFO和FIFO)。
下面是scan方法源代碼解析(補(bǔ)充了一些細(xì)節(jié)):
private boolean scan(ForkJoinWorkerThread w, int a) { int g = scanGuard; // parallelism表示并發(fā)數(shù),一般等于CPU可以同時(shí)運(yùn)行的線程數(shù), // 默認(rèn)值是Runtime類的availableProcessors方法返回值,表示 // 處理器的數(shù)量,因此parallelism大于0。 // a是活躍的Worker線程數(shù),肯定大于等于0,因此 // 條件parallelism == 1 - a滿足意味著parallelism為1而a為0。 // 也就是當(dāng)前沒有Worker線程在執(zhí)行任務(wù)。blockedCount為0意味 // 著沒有線程因?yàn)閖oin被阻塞。 // 兩個(gè)條件同時(shí)滿足也就意味既沒有任何線程在運(yùn)行,那么也就 // 意味著不可能有任務(wù)存放于worker線程,所以m=0,也就是沒 // 法偷任務(wù)。 // g & SMASK返回的值scanGuard的0到15位的數(shù)值(一個(gè)2的N次方減去1的值) int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws == null || ws.length <= m) return false; // 偷任務(wù) for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; // 從線程隊(duì)列中隨機(jī)獲取一個(gè)worker線程 ForkJoinWorkerThread v = ws[k & m]; // v!=null表示隨機(jī)索引的線程存在 // queueBase不等于queueTop表示線程的任務(wù)隊(duì)列不為空 // v.queue不為null表示任務(wù)隊(duì)列已經(jīng)被初始化 // (q.length - 1) 同樣是2的N次方減一,和b相與得到一個(gè) // 在數(shù)組長(zhǎng)度范圍內(nèi)的數(shù)組下標(biāo) // 這一串判斷是為了確認(rèn)找到了一個(gè)有任務(wù)的線程來偷任務(wù) if (v != null && (b = v.queueBase) != v.queueTop && (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { // u是計(jì)算Unsafe的索引,用以CAS操作 long u = (i << ASHIFT) + ABASE; // (t = q[i]) != null用以判斷數(shù)組該位置存有任務(wù) // v.queueBase == b為了確認(rèn)沒有線程拿走任務(wù) // CAS操作把該數(shù)組元素設(shè)為null表示拿走任務(wù) if ((t = q[i]) != null && v.queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { //v.queueBase = b + 1更新隊(duì)列頭部位置 int d = (v.queueBase = b + 1) - v.queueTop; v.stealHint = w.poolIndex; // d是偷走一個(gè)任務(wù)后任務(wù)隊(duì)列的長(zhǎng)度 if (d != 0) signalWork(); w.execTask(t); } r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); // false表示掃描到了任務(wù) return false; } // j < 0時(shí)隨機(jī)選取Worker線程 else if (j < 0) { // 異或移位,更新k r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; } // j >= 0后按個(gè)嘗試線程 else ++k; } // 如果掃描不到任務(wù),但是scanGuard被更新了, // 說明有新的Worker線程被添加進(jìn)來 if (scanGuard != g) return false; else { // 從線程池的任務(wù)隊(duì)列中取出任務(wù)來執(zhí)行 // 邏輯和上面從其他線程的任務(wù)隊(duì)列偷任務(wù)類似 ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; if ((b = queueBase) != queueTop && (q = submissionQueue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { queueBase = b + 1; w.execTask(t); } return false; } return true; } }Worker線程一上來就直接偷其他線程的任務(wù),自己的任務(wù)不管嗎?來看execTask就知道了:
final void execTask(ForkJoinTask<?> t) { currentSteal = t; for (;;) { // 首先執(zhí)行偷來的任務(wù) if (t != null) t.doExec(); // 先把自己的任務(wù)全部執(zhí)行,再返回去偷別的線程去執(zhí)行 if (queueTop == queueBase) break; // locallyFifo一般來自線程池的設(shè)置 // 為true使用FIFO的方式從隊(duì)列中取任務(wù)執(zhí)行 // 為false使用LIFO的方式(棧的方式)取任務(wù) t = locallyFifo ? locallyDeqTask() : popTask(); } // 更新偷任務(wù)的計(jì)數(shù) ++stealCount; currentSteal = null; }在線程池的work方法(見第23篇)中還涉及到一個(gè)tryAwaitWork方法,以下是該方法的解析:
private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) { int v = w.eventCount; // ctl值的0-30位存儲(chǔ)了等待線程的信息 //(參考第23篇中work方法解析中關(guān)于ctl的解釋) // 等待線程是按照棧的方式存儲(chǔ)的,因此這里把原來排 // 第一位的等待線程設(shè)為當(dāng)前線程的下一個(gè),當(dāng)前線程 // 變成排到第一位 w.nextWait = (int)c; // 正在運(yùn)行的線程數(shù)減少1,因此把48-63位的AC值減1 long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); // 兩個(gè)條件等同于ctl發(fā)生了變化 if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { long d = ctl; // 第一個(gè)條件表示第一個(gè)等待線程已經(jīng)發(fā)生變化(ctl值的0-30位) // 第二個(gè)條件表示增加了正在運(yùn)行的線程數(shù)變少 // 兩個(gè)條件都滿足時(shí)返回true,強(qiáng)制再掃描一次 return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L; } // for (int sc = w.stealCount; sc != 0;) { // accumulate stealCount long s = stealCount; // 把線程w的stealCount加到線程池的stealCount上,然后再設(shè)置w // 的stealCount為0 if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc)) sc = w.stealCount = 0; // 線程自己的eventCount發(fā)生變化,則下次再更新stealCount else if (w.eventCount != v) return true; } // shutdown或者tryTerminate不為false表示當(dāng)前的線程沒有處于正在關(guān)閉狀態(tài) // (int)c != 0表示有線程在等待 // parallelism + (int)(nc >> AC_SHIFT)表示活躍線程數(shù)為0 // blockedCount == 0表示正在join等待的線程數(shù)為0 // quiescerCount == 0表示Quiesce線程池中的線程數(shù)為0 // 關(guān)于Quiesce線程池后面會(huì)做介紹 if ((!shutdown || !tryTerminate(false)) && (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 && blockedCount == 0 && quiescerCount == 0) // 滿足上述條件說明當(dāng)前線程池沒有任何線程在工作(包括運(yùn)行 // 任務(wù)和join等待),這種情況下,這個(gè)線程就會(huì)等待一段時(shí)間 // 然后如果還是沒有任何事件發(fā)生,就會(huì)把這個(gè)線程關(guān)閉。 idleAwaitWork(w, nc, c, v); for (boolean rescanned = false;;) { if (w.eventCount != v) return true; // 嘗試把當(dāng)前線程從等待隊(duì)列中移除, // 一旦移除,eventCount就會(huì)發(fā)生變化,然后返回 if (!rescanned) { int g = scanGuard, m = g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws != null && m < ws.length) { rescanned = true; for (int i = 0; i <= m; ++i) { ForkJoinWorkerThread u = ws[i]; if (u != null) { if (u.queueBase != u.queueTop && !tryReleaseWaiter()) rescanned = false; if (w.eventCount != v) return true; } } } if (scanGuard != g || (queueBase != queueTop && !tryReleaseWaiter())) rescanned = false; if (!rescanned) // 讓出控制權(quán),減少?zèng)_突 Thread.yield(); else // 在Park之前清除中斷狀態(tài) Thread.interrupted(); } else { w.parked = true; if (w.eventCount != v) { w.parked = false; return true; } LockSupport.park(this); rescanned = w.parked = false; } } }零零碎碎說了關(guān)于Fork的部分,后面會(huì)繼續(xù)說關(guān)于Join的過程。
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注