国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學(xué)院 > 開發(fā)設(shè)計 > 正文

《java.util.concurrent 包源碼閱讀》16 一種特別的BlockingQueue:SynchronousQueue

2019-11-14 20:59:39
字體:
供稿:網(wǎng)友
java.util.concurrent 包源碼閱讀》16 一種特別的BlockingQueue:SynchronousQueue

SynchronousQueue是一種很特別的BlockingQueue,任何一個添加元素的操作都必須等到另外一個線程拿走元素才會結(jié)束。也就是SynchronousQueue本身不會存儲任何元素,相當(dāng)于生產(chǎn)者和消費(fèi)者手遞手直接交易。

SynchronousQueue有一個fair選項(xiàng),如果fair為true,稱為fair模式,否則就是unfair模式。

在fair模式下,所有等待的生產(chǎn)者線程或者消費(fèi)者線程會按照開始等待時間依次排隊(duì),然后按照等待先后順序進(jìn)行匹配交易。這種情況用隊(duì)列實(shí)現(xiàn)。

在unfair模式下,則剛好相反,后來先匹配,這種情況用棧實(shí)現(xiàn)。

     */    public SynchronousQueue(boolean fair) {        transferer = fair ? new TransferQueue() : new TransferStack();    }

因?yàn)樘砑釉睾湍米咴厥穷愃剖诌f手交易的,所以對于拿走元素和添加元素操作,SynchronousQueue調(diào)用的是Transferer同一個方法transfer。

當(dāng)object為null時表示是拿走元素,用于消費(fèi)者線程,否則則是添加元素,用于生產(chǎn)者線程。因此transfer方法是分析的重點(diǎn)。

abstract Object transfer(Object e, boolean timed, long nanos);

首先來看用于fair模式的TransferQueue的transfer方法:

看代碼之前,來理一下邏輯:

1. 開始隊(duì)列肯定是空。

2. 線程進(jìn)入隊(duì)列,如果隊(duì)列是空的,那么就添加該線程進(jìn)入隊(duì)列,然后進(jìn)行等待(要么有匹配線程出現(xiàn),要么就是該請求超時取消)

3. 第二個線程進(jìn)入,如果前面一個線程跟它屬于不同類型,也就是說兩者是可以匹配的,那么就從隊(duì)列刪除第一個線程。

如果是相同的線程,那么做法參照2。

理清了基本邏輯,也就是會有兩種情況:

1. 隊(duì)列為空或者隊(duì)列中的等待線程是相同類型

2. 隊(duì)列中的等待線程是匹配的類型

        Object transfer(Object e, boolean timed, long nanos) {            QNode s = null;            // e不是null表示是生成者線程,e就是產(chǎn)品,反之就是消費(fèi)者線程            boolean isData = (e != null);            for (;;) {                QNode t = tail;                QNode h = head;                // tail和head在隊(duì)列創(chuàng)建時會被初始化成一個虛擬節(jié)點(diǎn)                // 因此發(fā)現(xiàn)沒有初始化,重新循環(huán)等待直到初始化完成                if (t == null || h == null)                    continue;                // 隊(duì)列為空或等待線程類型相同(不同類型才能匹配)                // 這兩種情況都要把當(dāng)前線程加入到等待隊(duì)列中                if (h == t || t.isData == isData) {                    QNode tn = t.next;                    // tail對象已經(jīng)被更新,出現(xiàn)不一致讀的現(xiàn)象,重新循環(huán)                    if (t != tail)                        continue;                    // 添加線程到等待隊(duì)列時會先更新當(dāng)前tail的next,然后                    // 更新tail本身,因此出現(xiàn)只有next被更新的情況,應(yīng)該                    // 更新tail,然后重新循環(huán)                    if (tn != null) {                        advanceTail(t, tn);                        continue;                    }                    // 設(shè)定了超時,剩余等待時間耗盡的時候,就無需再等待                    if (timed && nanos <= 0)                        return null;                    // 首次使用s的時候,新建一個節(jié)點(diǎn)保存當(dāng)前線程和數(shù)據(jù)來初始化s                    if (s == null)                        s = new QNode(e, isData);                    // 嘗試更新tail的next,把新建節(jié)點(diǎn)添加到tail的后面,如果失敗了,就重新循環(huán)                    if (!t.casNext(null, s))                        continue;                    // 把新建的節(jié)點(diǎn)設(shè)置為tail                    advanceTail(t, s);                    // 等待匹配線程,成功匹配則返回的匹配的值                    // 否則返回當(dāng)前節(jié)點(diǎn),因此s和x相同表示請求被取消                    Object x = awaitFulfill(s, e, timed, nanos);                    if (x == s) {                        clean(t, s);                        return null;                    }                    // 這個時候已經(jīng)匹配成功了,s應(yīng)該是排在第一個的等待線程                    // 如果s依然在隊(duì)列中,那么需要更新head。                    // 更新head的方法是把s這個排在第一位的節(jié)點(diǎn)作為新的head                    // 因此需要重置一些屬性使它變成虛擬節(jié)點(diǎn)                    if (!s.isOffList()) {                        advanceHead(t, s);                        if (x != null)                            s.item = s;                        s.waiter = null;                    }                    // x不為null表示拿到匹配線程的數(shù)據(jù)(消費(fèi)者拿到生產(chǎn)者的數(shù)據(jù)),                    // 因此返回該數(shù)據(jù),否則返回本身的數(shù)據(jù)(生成者返回自己的數(shù)據(jù))                    return (x != null) ? x : e;                } else { // 線程可以匹配                    // 因?yàn)槭顷?duì)列,因此匹配的是第一個節(jié)點(diǎn)                    QNode m = h.next;                    // 同樣需要檢查不一致讀的情況                    if (t != tail || m == null || h != head)                        continue;                    Object x = m.item;                    // 匹配失敗時,把m從隊(duì)列中移走,重新循環(huán)                    if (isData == (x != null) ||    // m已經(jīng)被匹配了                        x == m ||                   // m已經(jīng)被取消了                        !m.casItem(x, e)) {         // 用CAS設(shè)置m的數(shù)據(jù)為null                        advanceHead(h, m);                        continue;                    }                    // 匹配成功,更新head                    advanceHead(h, m);                    // 解除m的線程等待狀態(tài)                    LockSupport.unpark(m.waiter);                    // 返回匹配的數(shù)據(jù)                    return (x != null) ? x : e;                }            }        }

接著來用于Unfair模式的TransferStack的transfer方法

大體邏輯應(yīng)該是一樣的,不同就是隊(duì)列的入隊(duì)和出隊(duì)操作對應(yīng)到棧時就是入棧和出棧的操作。

        Object transfer(Object e, boolean timed, long nanos) {            SNode s = null;            int mode = (e == null) ? REQUEST : DATA;            for (;;) {                SNode h = head;                // 棧為空或者節(jié)點(diǎn)類型相同的情況                if (h == null || h.mode == mode) {                    if (timed && nanos <= 0) {                        // 檢查棧頂節(jié)點(diǎn)是否已經(jīng)取消,如果已經(jīng)取消,彈出節(jié)點(diǎn)                        // 重新循環(huán),接著檢查新的棧頂節(jié)點(diǎn)                        if (h != null && h.isCancelled())                            casHead(h, h.next);                        else                            return null;                    // 新建節(jié)點(diǎn),并且嘗試把新節(jié)點(diǎn)入棧                    } else if (casHead(h, s = snode(s, e, h, mode))) {                        // 等待匹配,如果發(fā)現(xiàn)是被取消的情況,則釋放節(jié)點(diǎn),返回null                        SNode m = awaitFulfill(s, timed, nanos);                        if (m == s) {                            clean(s);                            return null;                        }                        // 如果匹配的成功兩個節(jié)點(diǎn)是棧頂?shù)膬蓚€節(jié)點(diǎn)                        // 把這兩個節(jié)點(diǎn)都彈出                        if ((h = head) != null && h.next == s)                            casHead(h, s.next);     // help s's fulfiller                        return (mode == REQUEST) ? m.item : s.item;                    }                } else if (!isFulfilling(h.mode)) { // 棧頂節(jié)點(diǎn)沒有和其他線程在匹配,可以匹配                    if (h.isCancelled())            // 棧頂節(jié)點(diǎn)的請求已經(jīng)被取消                        casHead(h, h.next);         // 移除棧頂元素重新循環(huán)                    // 嘗試把該節(jié)點(diǎn)也入棧,該節(jié)點(diǎn)設(shè)置為正在匹配的狀態(tài)                    // 也就是isFulfilling返回true                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {                        for (;;) {                            // 棧頂節(jié)點(diǎn)(當(dāng)前線程的節(jié)點(diǎn))和它的下一個節(jié)點(diǎn)進(jìn)行匹配,m為null意味著                            // 棧里沒有其他節(jié)點(diǎn)了,因?yàn)榍懊嬖摴?jié)點(diǎn)入棧了,需要彈出這個節(jié)點(diǎn)重新循環(huán)                            SNode m = s.next;                            if (m == null) {                                casHead(s, null);                                s = null;                                break;                            }                            // 這個時候是有節(jié)點(diǎn)可以匹配的,嘗試為這兩個節(jié)點(diǎn)做匹配                            SNode mn = m.next;                            // m和s匹配成功,彈出這兩個節(jié)點(diǎn),返回數(shù)據(jù);匹配失敗,把m移除                            if (m.tryMatch(s)) {                                casHead(s, mn);                                return (mode == REQUEST) ? m.item : s.item;                            } else                                s.casNext(m, mn);                        }                    }                // 棧頂正在匹配,參見代碼:                // else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {                // 做法基本類似,只是這里幫助其他線程匹配,無論成功與否                // 都要重新循環(huán)                } else {                    SNode m = h.next;                                   if (m == null)                        casHead(h, null);                    else {                        SNode mn = m.next;                        if (m.tryMatch(h))                            casHead(h, mn);                        else                            h.casNext(m, mn);                    }                }            }        }

TransferQueue和TransferStack的算法實(shí)現(xiàn)可以參考 這里


發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 黔南| 上蔡县| 永平县| 徐闻县| 拜城县| 岳阳县| 吉水县| 松原市| 德令哈市| 巴彦县| 城固县| 平南县| 台南市| 竹北市| 静乐县| 鄂托克旗| 建阳市| 天门市| 郧西县| 县级市| 河源市| 略阳县| 沛县| 马边| 赞皇县| 陆丰市| 堆龙德庆县| 阿瓦提县| 肃南| 海城市| 衡南县| 措勤县| 曲阜市| 甘孜县| 克什克腾旗| 吴江市| 长宁区| 龙南县| 正蓝旗| 蒙城县| 太康县|