SynchronousQueue是一種很特別的BlockingQueue,任何一個添加元素的操作都必須等到另外一個線程拿走元素才會結(jié)束。也就是SynchronousQueue本身不會存儲任何元素,相當(dāng)于生產(chǎn)者和消費(fèi)者手遞手直接交易。
SynchronousQueue有一個fair選項(xiàng),如果fair為true,稱為fair模式,否則就是unfair模式。
在fair模式下,所有等待的生產(chǎn)者線程或者消費(fèi)者線程會按照開始等待時間依次排隊(duì),然后按照等待先后順序進(jìn)行匹配交易。這種情況用隊(duì)列實(shí)現(xiàn)。
在unfair模式下,則剛好相反,后來先匹配,這種情況用棧實(shí)現(xiàn)。
*/ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); }因?yàn)樘砑釉睾湍米咴厥穷愃剖诌f手交易的,所以對于拿走元素和添加元素操作,SynchronousQueue調(diào)用的是Transferer同一個方法transfer。
當(dāng)object為null時表示是拿走元素,用于消費(fèi)者線程,否則則是添加元素,用于生產(chǎn)者線程。因此transfer方法是分析的重點(diǎn)。
abstract Object transfer(Object e, boolean timed, long nanos);
首先來看用于fair模式的TransferQueue的transfer方法:
看代碼之前,來理一下邏輯:
1. 開始隊(duì)列肯定是空。
2. 線程進(jìn)入隊(duì)列,如果隊(duì)列是空的,那么就添加該線程進(jìn)入隊(duì)列,然后進(jìn)行等待(要么有匹配線程出現(xiàn),要么就是該請求超時取消)
3. 第二個線程進(jìn)入,如果前面一個線程跟它屬于不同類型,也就是說兩者是可以匹配的,那么就從隊(duì)列刪除第一個線程。
如果是相同的線程,那么做法參照2。
理清了基本邏輯,也就是會有兩種情況:
1. 隊(duì)列為空或者隊(duì)列中的等待線程是相同類型
2. 隊(duì)列中的等待線程是匹配的類型
Object transfer(Object e, boolean timed, long nanos) { QNode s = null; // e不是null表示是生成者線程,e就是產(chǎn)品,反之就是消費(fèi)者線程 boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; // tail和head在隊(duì)列創(chuàng)建時會被初始化成一個虛擬節(jié)點(diǎn) // 因此發(fā)現(xiàn)沒有初始化,重新循環(huán)等待直到初始化完成 if (t == null || h == null) continue; // 隊(duì)列為空或等待線程類型相同(不同類型才能匹配) // 這兩種情況都要把當(dāng)前線程加入到等待隊(duì)列中 if (h == t || t.isData == isData) { QNode tn = t.next; // tail對象已經(jīng)被更新,出現(xiàn)不一致讀的現(xiàn)象,重新循環(huán) if (t != tail) continue; // 添加線程到等待隊(duì)列時會先更新當(dāng)前tail的next,然后 // 更新tail本身,因此出現(xiàn)只有next被更新的情況,應(yīng)該 // 更新tail,然后重新循環(huán) if (tn != null) { advanceTail(t, tn); continue; } // 設(shè)定了超時,剩余等待時間耗盡的時候,就無需再等待 if (timed && nanos <= 0) return null; // 首次使用s的時候,新建一個節(jié)點(diǎn)保存當(dāng)前線程和數(shù)據(jù)來初始化s if (s == null) s = new QNode(e, isData); // 嘗試更新tail的next,把新建節(jié)點(diǎn)添加到tail的后面,如果失敗了,就重新循環(huán) if (!t.casNext(null, s)) continue; // 把新建的節(jié)點(diǎn)設(shè)置為tail advanceTail(t, s); // 等待匹配線程,成功匹配則返回的匹配的值 // 否則返回當(dāng)前節(jié)點(diǎn),因此s和x相同表示請求被取消 Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { clean(t, s); return null; } // 這個時候已經(jīng)匹配成功了,s應(yīng)該是排在第一個的等待線程 // 如果s依然在隊(duì)列中,那么需要更新head。 // 更新head的方法是把s這個排在第一位的節(jié)點(diǎn)作為新的head // 因此需要重置一些屬性使它變成虛擬節(jié)點(diǎn) if (!s.isOffList()) { advanceHead(t, s); if (x != null) s.item = s; s.waiter = null; } // x不為null表示拿到匹配線程的數(shù)據(jù)(消費(fèi)者拿到生產(chǎn)者的數(shù)據(jù)), // 因此返回該數(shù)據(jù),否則返回本身的數(shù)據(jù)(生成者返回自己的數(shù)據(jù)) return (x != null) ? x : e; } else { // 線程可以匹配 // 因?yàn)槭顷?duì)列,因此匹配的是第一個節(jié)點(diǎn) QNode m = h.next; // 同樣需要檢查不一致讀的情況 if (t != tail || m == null || h != head) continue; Object x = m.item; // 匹配失敗時,把m從隊(duì)列中移走,重新循環(huán) if (isData == (x != null) || // m已經(jīng)被匹配了 x == m || // m已經(jīng)被取消了 !m.casItem(x, e)) { // 用CAS設(shè)置m的數(shù)據(jù)為null advanceHead(h, m); continue; } // 匹配成功,更新head advanceHead(h, m); // 解除m的線程等待狀態(tài) LockSupport.unpark(m.waiter); // 返回匹配的數(shù)據(jù) return (x != null) ? x : e; } } }接著來用于Unfair模式的TransferStack的transfer方法
大體邏輯應(yīng)該是一樣的,不同就是隊(duì)列的入隊(duì)和出隊(duì)操作對應(yīng)到棧時就是入棧和出棧的操作。
Object transfer(Object e, boolean timed, long nanos) { SNode s = null; int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; // 棧為空或者節(jié)點(diǎn)類型相同的情況 if (h == null || h.mode == mode) { if (timed && nanos <= 0) { // 檢查棧頂節(jié)點(diǎn)是否已經(jīng)取消,如果已經(jīng)取消,彈出節(jié)點(diǎn) // 重新循環(huán),接著檢查新的棧頂節(jié)點(diǎn) if (h != null && h.isCancelled()) casHead(h, h.next); else return null; // 新建節(jié)點(diǎn),并且嘗試把新節(jié)點(diǎn)入棧 } else if (casHead(h, s = snode(s, e, h, mode))) { // 等待匹配,如果發(fā)現(xiàn)是被取消的情況,則釋放節(jié)點(diǎn),返回null SNode m = awaitFulfill(s, timed, nanos); if (m == s) { clean(s); return null; } // 如果匹配的成功兩個節(jié)點(diǎn)是棧頂?shù)膬蓚€節(jié)點(diǎn) // 把這兩個節(jié)點(diǎn)都彈出 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (mode == REQUEST) ? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // 棧頂節(jié)點(diǎn)沒有和其他線程在匹配,可以匹配 if (h.isCancelled()) // 棧頂節(jié)點(diǎn)的請求已經(jīng)被取消 casHead(h, h.next); // 移除棧頂元素重新循環(huán) // 嘗試把該節(jié)點(diǎn)也入棧,該節(jié)點(diǎn)設(shè)置為正在匹配的狀態(tài) // 也就是isFulfilling返回true else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // 棧頂節(jié)點(diǎn)(當(dāng)前線程的節(jié)點(diǎn))和它的下一個節(jié)點(diǎn)進(jìn)行匹配,m為null意味著 // 棧里沒有其他節(jié)點(diǎn)了,因?yàn)榍懊嬖摴?jié)點(diǎn)入棧了,需要彈出這個節(jié)點(diǎn)重新循環(huán) SNode m = s.next; if (m == null) { casHead(s, null); s = null; break; } // 這個時候是有節(jié)點(diǎn)可以匹配的,嘗試為這兩個節(jié)點(diǎn)做匹配 SNode mn = m.next; // m和s匹配成功,彈出這兩個節(jié)點(diǎn),返回數(shù)據(jù);匹配失敗,把m移除 if (m.tryMatch(s)) { casHead(s, mn); return (mode == REQUEST) ? m.item : s.item; } else s.casNext(m, mn); } } // 棧頂正在匹配,參見代碼: // else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 做法基本類似,只是這里幫助其他線程匹配,無論成功與否 // 都要重新循環(huán) } else { SNode m = h.next; if (m == null) casHead(h, null); else { SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); else h.casNext(m, mn); } } } }TransferQueue和TransferStack的算法實(shí)現(xiàn)可以參考 這里
新聞熱點(diǎn)
疑難解答