Exchanger可以看做雙向數(shù)據(jù)傳輸?shù)腟ynchronousQueue,即沒(méi)有生產(chǎn)者和消費(fèi)者之分,任意兩個(gè)線程都可以交換數(shù)據(jù)。
在JDK5中Exchanger被設(shè)計(jì)成一個(gè)容量為1的容器,存放一個(gè)等待線程,直到有另外線程到來(lái)就會(huì)發(fā)生數(shù)據(jù)交換,然后清空容器,等到下一個(gè)到來(lái)的線程。
從JDK6開(kāi)始,Exchanger用了類(lèi)似ConcurrentMap的分段思想,提供了多個(gè)slot,增加了并發(fā)執(zhí)行時(shí)的吞吐量。
Exchanger不存在公平不公平的模式,因?yàn)闆](méi)有排隊(duì)的情況發(fā)生,只要有兩個(gè)線程就可以發(fā)生數(shù)據(jù)交換。
直接看核心方法:
PRivate Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); // index是線程ID的hash值映射到0到max之間的一個(gè)值 // 一般情況下max為0,這樣線程交換數(shù)據(jù)只會(huì)使用第一個(gè)slot, // 即index是0,而max不為0情況請(qǐng)看下面的循環(huán) int index = hashIndex(); // CAS操作失敗的次數(shù) int fails = 0; for (;;) { // 當(dāng)前slot中存儲(chǔ)的對(duì)象,也就是Node Object y; Slot slot = arena[index]; // 延遲加載,即只有當(dāng)slot為null時(shí)才創(chuàng)建一個(gè)slot // 延遲加載后重新循環(huán)一次 if (slot == null) createSlot(index); // slot中有數(shù)據(jù),也就意味著有線程在等待交換數(shù)據(jù) // 這時(shí)可以嘗試用CAS重置slot(把slot存儲(chǔ)的對(duì)象設(shè)為null) // 用slot中存儲(chǔ)的對(duì)象和當(dāng)前線程進(jìn)行數(shù)據(jù)交換 // 如果交換成功就通知原先等待的線程 else if ((y = slot.get()) != null && slot.compareAndSet(y, null)) { Node you = (Node)y; if (you.compareAndSet(null, item)) { LockSupport.unpark(you.waiter); return you.item; } // 如果slot存儲(chǔ)的對(duì)象已經(jīng)被重置為null,但是數(shù)據(jù)交換失敗了 // 這時(shí)就意味著這個(gè)等待的線程的交換請(qǐng)求被取消了 // 在分析wait類(lèi)型的方法代碼時(shí)會(huì)看到如何處理這種情況 } // 如果slot中沒(méi)有存儲(chǔ)對(duì)象,那么首先嘗試把當(dāng)前線程存儲(chǔ)到slot中 // 如果存儲(chǔ)失敗了,就重新循環(huán) else if (y == null && slot.compareAndSet(null, me)) { // index為0意味著僅僅有當(dāng)前線程在等待交換數(shù)據(jù),因此直接等待即可 if (index == 0) return timed ? awaitNanos(me, slot, nanos) : await(me, slot); // 所謂的spin wait:就是固定次數(shù)循環(huán),每次計(jì)數(shù)減一 // 對(duì)于單核系統(tǒng)來(lái)說(shuō),spin wait是不做的,因?yàn)閱魏? // 做wait時(shí)需要占用CPU,其他線程是無(wú)法使用CPU,因此這樣 // 的等待毫無(wú)意義。而多核系統(tǒng)中spin值為2000,也就是會(huì)做 // 2000次循環(huán)。 // 如果循環(huán)完成后依然沒(méi)有得到交換的數(shù)據(jù),那么會(huì)返回一個(gè) // CANCEL對(duì)象表示請(qǐng)求依舊被取消,并且把Node從slot中清除 Object v = spinWait(me, slot); if (v != CANCEL) return v; // 如果取消了,就新建一個(gè)Node取消原先取消的Node用于下次循環(huán) me = new Node(item); int m = max.get(); // index除2,縮小slot的范圍 // 同時(shí)如果m過(guò)大,減小m if (m > (index >>>= 1)) max.compareAndSet(m, m - 1); } // 允許CAS失敗兩次,因?yàn)閮蓚€(gè)else if中都有CAS,因此這里 // 允許兩個(gè)else if的CAS操作都失敗過(guò) else if (++fails > 1) { int m = max.get(); // 失敗超過(guò)3次,增大m,并且從m處重新索引 if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; // 當(dāng)index小于0,回到m,重新循環(huán) else if (--index < 0) index = m; } } }
這篇文章關(guān)于索引index這塊弄得不是很清楚,后續(xù)會(huì)繼續(xù)研究,及時(shí)更新。
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注