国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

《java.util.concurrent 包源碼閱讀》25 Fork/Join框架之Fork與Work-Stealing(重寫23,24)

2019-11-14 21:09:56
字體:
供稿:網(wǎng)友
java.util.concurrent 包源碼閱讀》25 Fork/Join框架之Fork與Work-Stealing(重寫23,24)

在寫前面兩篇文章23和24的時(shí)候自己有很多細(xì)節(jié)搞得不是很明白,這篇文章把Fork和Work-Stealing相關(guān)的源代碼重新梳理一下。

首先來看一些線程池定義的成員變量:

關(guān)于scanGuard:

volatile int scanGuard;PRivate static final int SG_UNIT = 1 << 16;private static final int  SMASK      = 0xffff;

scanGuard低位16位數(shù)值(0到15位)始終等于2的N次方減去1,代表的是大于Worker線程數(shù)的最小的2的N次方減去1。因此每次要取低16位數(shù)據(jù)時(shí)都要用到SMASK。

scanGuard的第16位是一個(gè)標(biāo)志位,被當(dāng)成是一個(gè)更新worker線程數(shù)組的鎖使用。當(dāng)該位的數(shù)據(jù)是1時(shí),表示worker線程數(shù)組被鎖住,其他線程無法更新worker線程。

要更新第16位的數(shù)值,就需要用到SG_UNIT。

再來說說與任務(wù)隊(duì)列有關(guān)的三個(gè)變量:

// 存儲(chǔ)任務(wù)的數(shù)組,長(zhǎng)度是2的N次方ForkJoinTask<?>[] queue;// 最后一個(gè)元素?cái)?shù)組下標(biāo)+1// 如果把數(shù)組看成是隊(duì)列,那么該位置就是隊(duì)列尾部(FIFO添加元素)// 如果看成是棧,那么該位置就棧頂(LIFO拿走元素)// 只能當(dāng)前線程會(huì)使用這個(gè)數(shù)值,不存在多線程問題,因此不用volatileint queueTop;// 第一個(gè)元素的數(shù)組下標(biāo)// 也就是隊(duì)列的頭部的位置,從隊(duì)列中拿走元素時(shí),該數(shù)值加1// 其他線程偷任務(wù)(FIFO方式)時(shí)會(huì)更新這個(gè)變量,因此需要volatilevolatile int queueBase;

任務(wù)隊(duì)列的設(shè)計(jì)和Work-Stealing要求的一致(支持LIFO和FIFO)。

下面是scan方法源代碼解析(補(bǔ)充了一些細(xì)節(jié)):

    private boolean scan(ForkJoinWorkerThread w, int a) {        int g = scanGuard;        // parallelism表示并發(fā)數(shù),一般等于CPU可以同時(shí)運(yùn)行的線程數(shù),        // 默認(rèn)值是Runtime類的availableProcessors方法返回值,表示        // 處理器的數(shù)量,因此parallelism大于0。        // a是活躍的Worker線程數(shù),肯定大于等于0,因此        // 條件parallelism == 1 - a滿足意味著parallelism為1而a為0。        // 也就是當(dāng)前沒有Worker線程在執(zhí)行任務(wù)。blockedCount為0意味        // 著沒有線程因?yàn)閖oin被阻塞。        // 兩個(gè)條件同時(shí)滿足也就意味既沒有任何線程在運(yùn)行,那么也就        // 意味著不可能有任務(wù)存放于worker線程,所以m=0,也就是沒        // 法偷任務(wù)。        // g & SMASK返回的值scanGuard的0到15位的數(shù)值(一個(gè)2的N次方減去1的值)        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;        ForkJoinWorkerThread[] ws = workers;        if (ws == null || ws.length <= m)             return false;        // 偷任務(wù)        for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;            // 從線程隊(duì)列中隨機(jī)獲取一個(gè)worker線程            ForkJoinWorkerThread v = ws[k & m];            // v!=null表示隨機(jī)索引的線程存在            // queueBase不等于queueTop表示線程的任務(wù)隊(duì)列不為空            // v.queue不為null表示任務(wù)隊(duì)列已經(jīng)被初始化            // (q.length - 1) 同樣是2的N次方減一,和b相與得到一個(gè)            // 在數(shù)組長(zhǎng)度范圍內(nèi)的數(shù)組下標(biāo)            // 這一串判斷是為了確認(rèn)找到了一個(gè)有任務(wù)的線程來偷任務(wù)            if (v != null && (b = v.queueBase) != v.queueTop &&                (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {                // u是計(jì)算Unsafe的索引,用以CAS操作                long u = (i << ASHIFT) + ABASE;                // (t = q[i]) != null用以判斷數(shù)組該位置存有任務(wù)                // v.queueBase == b為了確認(rèn)沒有線程拿走任務(wù)                // CAS操作把該數(shù)組元素設(shè)為null表示拿走任務(wù)                if ((t = q[i]) != null && v.queueBase == b &&                    UNSAFE.compareAndSwapObject(q, u, t, null)) {                    //v.queueBase = b + 1更新隊(duì)列頭部位置                    int d = (v.queueBase = b + 1) - v.queueTop;                    v.stealHint = w.poolIndex;                    // d是偷走一個(gè)任務(wù)后任務(wù)隊(duì)列的長(zhǎng)度                    if (d != 0)                        signalWork();                    w.execTask(t);                }                r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);                // false表示掃描到了任務(wù)                return false;            }            // j < 0時(shí)隨機(jī)選取Worker線程            else if (j < 0) {                     // 異或移位,更新k                r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;            }            // j >= 0后按個(gè)嘗試線程            else                ++k;        }        // 如果掃描不到任務(wù),但是scanGuard被更新了,        // 說明有新的Worker線程被添加進(jìn)來        if (scanGuard != g)            return false;        else {            // 從線程池的任務(wù)隊(duì)列中取出任務(wù)來執(zhí)行            // 邏輯和上面從其他線程的任務(wù)隊(duì)列偷任務(wù)類似            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;            if ((b = queueBase) != queueTop &&                (q = submissionQueue) != null &&                (i = (q.length - 1) & b) >= 0) {                long u = (i << ASHIFT) + ABASE;                if ((t = q[i]) != null && queueBase == b &&                    UNSAFE.compareAndSwapObject(q, u, t, null)) {                    queueBase = b + 1;                    w.execTask(t);                }                return false;            }            return true;        }    }

Worker線程一上來就直接偷其他線程的任務(wù),自己的任務(wù)不管嗎?來看execTask就知道了:

    final void execTask(ForkJoinTask<?> t) {        currentSteal = t;        for (;;) {            // 首先執(zhí)行偷來的任務(wù)            if (t != null)                t.doExec();            // 先把自己的任務(wù)全部執(zhí)行,再返回去偷別的線程去執(zhí)行            if (queueTop == queueBase)                break;            // locallyFifo一般來自線程池的設(shè)置            // 為true使用FIFO的方式從隊(duì)列中取任務(wù)執(zhí)行            // 為false使用LIFO的方式(棧的方式)取任務(wù)            t = locallyFifo ? locallyDeqTask() : popTask();        }        // 更新偷任務(wù)的計(jì)數(shù)        ++stealCount;        currentSteal = null;    }

在線程池的work方法(見第23篇)中還涉及到一個(gè)tryAwaitWork方法,以下是該方法的解析:

    private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {        int v = w.eventCount;        // ctl值的0-30位存儲(chǔ)了等待線程的信息        //(參考第23篇中work方法解析中關(guān)于ctl的解釋)        // 等待線程是按照棧的方式存儲(chǔ)的,因此這里把原來排        // 第一位的等待線程設(shè)為當(dāng)前線程的下一個(gè),當(dāng)前線程        // 變成排到第一位        w.nextWait = (int)c;        // 正在運(yùn)行的線程數(shù)減少1,因此把48-63位的AC值減1        long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));        // 兩個(gè)條件等同于ctl發(fā)生了變化        if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {            long d = ctl;             // 第一個(gè)條件表示第一個(gè)等待線程已經(jīng)發(fā)生變化(ctl值的0-30位)            // 第二個(gè)條件表示增加了正在運(yùn)行的線程數(shù)變少            // 兩個(gè)條件都滿足時(shí)返回true,強(qiáng)制再掃描一次            return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;        }        //         for (int sc = w.stealCount; sc != 0;) {   // accumulate stealCount            long s = stealCount;            // 把線程w的stealCount加到線程池的stealCount上,然后再設(shè)置w            // 的stealCount為0            if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))                sc = w.stealCount = 0;            // 線程自己的eventCount發(fā)生變化,則下次再更新stealCount            else if (w.eventCount != v)                return true;        }        // shutdown或者tryTerminate不為false表示當(dāng)前的線程沒有處于正在關(guān)閉狀態(tài)        // (int)c != 0表示有線程在等待        // parallelism + (int)(nc >> AC_SHIFT)表示活躍線程數(shù)為0        // blockedCount == 0表示正在join等待的線程數(shù)為0        // quiescerCount == 0表示Quiesce線程池中的線程數(shù)為0        // 關(guān)于Quiesce線程池后面會(huì)做介紹        if ((!shutdown || !tryTerminate(false)) &&            (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&            blockedCount == 0 && quiescerCount == 0)            // 滿足上述條件說明當(dāng)前線程池沒有任何線程在工作(包括運(yùn)行            // 任務(wù)和join等待),這種情況下,這個(gè)線程就會(huì)等待一段時(shí)間            // 然后如果還是沒有任何事件發(fā)生,就會(huì)把這個(gè)線程關(guān)閉。            idleAwaitWork(w, nc, c, v);        for (boolean rescanned = false;;) {            if (w.eventCount != v)                return true;            // 嘗試把當(dāng)前線程從等待隊(duì)列中移除,            // 一旦移除,eventCount就會(huì)發(fā)生變化,然后返回            if (!rescanned) {                int g = scanGuard, m = g & SMASK;                ForkJoinWorkerThread[] ws = workers;                if (ws != null && m < ws.length) {                    rescanned = true;                    for (int i = 0; i <= m; ++i) {                        ForkJoinWorkerThread u = ws[i];                        if (u != null) {                            if (u.queueBase != u.queueTop &&                                !tryReleaseWaiter())                                rescanned = false;                            if (w.eventCount != v)                                return true;                        }                    }                }                if (scanGuard != g ||                    (queueBase != queueTop && !tryReleaseWaiter()))                    rescanned = false;                if (!rescanned)                    // 讓出控制權(quán),減少?zèng)_突                    Thread.yield();                else                    // 在Park之前清除中斷狀態(tài)                    Thread.interrupted();            }            else {                w.parked = true;                if (w.eventCount != v) {                    w.parked = false;                    return true;                }                LockSupport.park(this);                rescanned = w.parked = false;            }        }    }

零零碎碎說了關(guān)于Fork的部分,后面會(huì)繼續(xù)說關(guān)于Join的過程。


發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 乌鲁木齐县| 合水县| 福海县| 舒城县| 怀远县| 承德市| 上犹县| 乐安县| 苏尼特右旗| 隆昌县| 德格县| 濮阳县| 仁化县| 西贡区| 商南县| 凤庆县| 宣恩县| 濉溪县| 商南县| 靖宇县| 泾阳县| 濮阳县| 宕昌县| 响水县| 五家渠市| 嘉兴市| 菏泽市| 大渡口区| 康平县| 商河县| 丹阳市| 启东市| 上蔡县| 增城市| 锡林郭勒盟| 昆山市| 始兴县| 定西市| 大兴区| 衡南县| 平邑县|