1. HashMap在高并發(fā)的環(huán)境下,執(zhí)行put操作會導(dǎo)致HashMap的Entry鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu),從而導(dǎo)致Entry的next節(jié)點(diǎn)始終不為空,因此產(chǎn)生死循環(huán)獲取Entry
2. HashTable雖然是線程安全的,但是效率低下,當(dāng)一個(gè)線程訪問HashTable的同步方法時(shí),其他線程如果也訪問HashTable的同步方法,那么會進(jìn)入阻塞或者輪訓(xùn)狀態(tài)。
3. 在jdk1.6中ConcurrentHashMap使用鎖分段技術(shù)提高并發(fā)訪問效率。首先將數(shù)據(jù)分成一段一段地存儲,然后給每一段數(shù)據(jù)配一個(gè)鎖,當(dāng)一個(gè)線程占用鎖訪問其中一段數(shù)據(jù)時(shí),其他段的數(shù)據(jù)也能被其他線程訪問。然而在jdk1.8中的實(shí)現(xiàn)已經(jīng)拋棄了Segment分段鎖機(jī)制,利用CAS+Synchronized來保證并發(fā)更新的安全,底層依然采用數(shù)組+鏈表+紅黑樹的存儲結(jié)構(gòu)。
ConcurrentHashMap采用 分段鎖的機(jī)制,實(shí)現(xiàn)并發(fā)的更新操作,底層由Segment數(shù)組和HashEntry數(shù)組組成。Segment繼承ReentrantLock用來充當(dāng)鎖的角色,每個(gè) Segment 對象守護(hù)每個(gè)散列映射表的若干個(gè)桶。HashEntry 用來封裝映射表的鍵 / 值對;每個(gè)桶是由若干個(gè) HashEntry 對象鏈接起來的鏈表。一個(gè) ConcurrentHashMap 實(shí)例中包含由若干個(gè) Segment 對象組成的數(shù)組,下面我們通過一個(gè)圖來演示一下 ConcurrentHashMap 的結(jié)構(gòu):

改進(jìn)一:取消segments字段,直接采用transient volatile HashEntry<K,V> table保存數(shù)據(jù),采用table數(shù)組元素作為鎖,從而實(shí)現(xiàn)了對每一行數(shù)據(jù)進(jìn)行加鎖,進(jìn)一步減少并發(fā)沖突的概率。
改進(jìn)二:將原先table數(shù)組+單向鏈表的數(shù)據(jù)結(jié)構(gòu),變更為table數(shù)組+單向鏈表+紅黑樹的結(jié)構(gòu)。對于hash表來說,最核心的能力在于將key hash之后能均勻的分布在數(shù)組中。如果hash之后散列的很均勻,那么table數(shù)組中的每個(gè)隊(duì)列長度主要為0或者1。但實(shí)際情況并非總是如此理想,雖然ConcurrentHashMap類默認(rèn)的加載因子為0.75,但是在數(shù)據(jù)量過大或者運(yùn)氣不佳的情況下,還是會存在一些隊(duì)列長度過長的情況,如果還是采用單向列表方式,那么查詢某個(gè)節(jié)點(diǎn)的時(shí)間復(fù)雜度為O(n);因此,對于個(gè)數(shù)超過8(默認(rèn)值)的列表,jdk1.8中采用了紅黑樹的結(jié)構(gòu),那么查詢的時(shí)間復(fù)雜度可以降低到O(logN),可以改進(jìn)性能。
Node:保存key,value及key的hash值的數(shù)據(jù)結(jié)構(gòu)。其中value和next都用volatile修飾,保證并發(fā)的可見性。
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val;//volatile類型的 volatile Node<K,V> next;//volatile類型的 Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } //省略部分代碼 }ForwardingNode:一個(gè)特殊的Node節(jié)點(diǎn),hash值為-1,其中存儲nextTable的引用。
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //省略部分代碼 }接下來具體看看第四個(gè)構(gòu)造函數(shù)的具體實(shí)現(xiàn):
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) //至少使用盡可能多的bin initialCapacity = concurrencyLevel; //作為估計(jì)線程 long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap;//初始化sizeCtl } /** *返回給定所需容量,table的大小總是2的冪次方 **/ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }ConcurrentHashMap在構(gòu)造函數(shù)中只會初始化sizeCtl值,并不會直接初始化table,而是延緩到第一次put操作
我們還是繼續(xù)一步步看代碼,看inputVal的注釋a,這個(gè)方法helpTransfer,如果線程進(jìn)入到這邊說明已經(jīng)有其他線程正在做擴(kuò)容操作,這個(gè)是一個(gè)輔助方法
/** * Helps transfer if a resize is in progress. */final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //下面幾種情況和addCount的方法一樣,請參考addCount的備注 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table;}當(dāng)我們的putVal執(zhí)行到addCount的時(shí)候
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次竟來都baseCount都加1因?yàn)閤=1 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//1 CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //多線程CAS發(fā)生失敗的時(shí)候執(zhí)行 fullAddCount(x, uncontended);//2 return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //當(dāng)條件滿足開始擴(kuò)容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) {//如果小于0說明已經(jīng)有線程在進(jìn)行擴(kuò)容操作了 //一下的情況說明已經(jīng)有在擴(kuò)容或者多線程進(jìn)行了擴(kuò)容,其他線程直接break不要進(jìn)入擴(kuò)容操作 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//如果相等說明擴(kuò)容已經(jīng)完成,可以繼續(xù)擴(kuò)容 transfer(tab, nt); } //這個(gè)時(shí)候sizeCtl已經(jīng)等于(rs << RESIZE_STAMP_SHIFT) + 2等于一個(gè)大的負(fù)數(shù),這邊加上2很巧妙,因?yàn)閠ransfer后面對sizeCtl--操作的時(shí)候,最多只能減兩次就結(jié)束 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } }}看上面注釋1,每次都會對baseCount 加1,如果并發(fā)競爭太大,那么可能導(dǎo)致U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失敗,那么為了提高高并發(fā)的時(shí)候baseCount可見性失敗的問題,又避免一直重試,這樣性能會有很大的影響,那么在jdk8的時(shí)候是有引入一個(gè)類Striped64,其中LongAdder和DoubleAdder就是對這個(gè)類的實(shí)現(xiàn)。這兩個(gè)方法都是為解決高并發(fā)場景而生的,是AtomicLong的加強(qiáng)版,AtomicLong在高并發(fā)場景性能會比LongAdder差。但是LongAdder的空間復(fù)雜度會高點(diǎn)。
我們每次進(jìn)來都對baseCount進(jìn)行加1當(dāng)達(dá)到一定的容量時(shí),就需要對table進(jìn)行擴(kuò)容。擴(kuò)容方法就是transfer,這個(gè)方法稍微復(fù)雜一點(diǎn),大部分的代碼我都做了注釋
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; //構(gòu)建一個(gè)連節(jié)點(diǎn)的指針,用于標(biāo)識位 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; //循環(huán)的關(guān)鍵變量,判斷是否已經(jīng)擴(kuò)容完成,完成就return,退出循環(huán) boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //循環(huán)的關(guān)鍵i,i--操作保證了倒序遍歷數(shù)組 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) {//nextIndex=transferIndex=n=tab.length(默認(rèn)16) i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //i<0說明已經(jīng)遍歷完舊的數(shù)組tab;i>=n什么時(shí)候有可能呢?在下面看到i=n,所以目前i最大應(yīng)該是n吧。 //i+n>=nextn,nextn=nextTab.length,所以如果滿足i+n>=nextn說明已經(jīng)擴(kuò)容完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) {// a nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //利用CAS方法更新這個(gè)擴(kuò)容閾值,在這里面sizectl值減一,說明新加入一個(gè)線程參與到擴(kuò)容操作,參考sizeCtl的注釋 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //如果有多個(gè)線程進(jìn)行擴(kuò)容,那么這個(gè)值在第二個(gè)線程以后就不會相等,因?yàn)閟izeCtl已經(jīng)被減1了,所以后面的線程就只能直接返回,始終保證只有一個(gè)線程執(zhí)行了 a(上面注釋a) if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true;//finishing和advance保證線程已經(jīng)擴(kuò)容完成了可以退出循環(huán) i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null)//如果tab[i]為null,那么就把fwd插入到tab[i],表明這個(gè)節(jié)點(diǎn)已經(jīng)處理過了 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED)//那么如果f.hash=-1的話說明該節(jié)點(diǎn)為ForwardingNode,說明該節(jié)點(diǎn)已經(jīng)處理過了 advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; //這邊還對鏈表進(jìn)行遍歷,這邊的的算法和hashmap的算法又不一樣了,這班是有點(diǎn)對半拆分的感覺 //把鏈表分表拆分為,hash&n等于0和不等于0的,然后分別放在新表的i和i+n位置 //次方法同hashmap的resize for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); //把已經(jīng)替換的節(jié)點(diǎn)的舊tab的i的位置用fwd替換,fwd包含nextTab setTabAt(tab, i, fwd); advance = true; }//下面紅黑樹基本和鏈表差不多 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //判斷擴(kuò)容后是否還需要紅黑樹結(jié)構(gòu) ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } }}值得細(xì)細(xì)品味的是,transfer的for循環(huán)是倒敘的,說明對table的遍歷是從table.length-1開始到0的。我覺得這段代碼寫得太牛逼了,特別是
//利用CAS方法更新這個(gè)擴(kuò)容閾值,在這里面sizectl值減一,說明新加入一個(gè)線程參與到擴(kuò)容操作,參考sizeCtl的注釋if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //如果有多個(gè)線程進(jìn)行擴(kuò)容,那么這個(gè)值在第二個(gè)線程以后就不會相等,因?yàn)閟izeCtl已經(jīng)被減1了,所以后面的線程就只能直接返回,始終保證只有一個(gè)線程執(zhí)行了 a(上面注釋a) if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true;//finishing和advance保證線程已經(jīng)擴(kuò)容完成了可以退出循環(huán) i = n; // recheck before commit}注意:如果鏈表結(jié)構(gòu)中元素超過TREEIFY_THRESHOLD閾值,默認(rèn)為8個(gè),則把鏈表轉(zhuǎn)化為紅黑樹,提高遍歷查詢效率.接下來我們看看如何構(gòu)造樹結(jié)構(gòu),代碼如下:
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } }}可以看出,生成樹節(jié)點(diǎn)的代碼塊是同步的,進(jìn)入同步代碼塊之后,再次驗(yàn)證table中index位置元素是否被修改過。 1、根據(jù)table中index位置Node鏈表,重新生成一個(gè)hd為頭結(jié)點(diǎn)的TreeNode鏈表。 2、根據(jù)hd頭結(jié)點(diǎn),生成TreeBin樹結(jié)構(gòu),并把樹結(jié)構(gòu)的root節(jié)點(diǎn)寫到table的index位置的內(nèi)存中,具體實(shí)現(xiàn)如下:
TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); this.first = b; TreeNode<K,V> r = null; for (TreeNode<K,V> x = b, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root);}這個(gè)get請求,我們需要cas來保證變量的原子性。如果tab[i]正被鎖住,那么CAS就會失敗,失敗之后就會不斷的重試。這也保證了get在高并發(fā)情況下不會出錯(cuò)。 我們來分析下到底有多少種情況會導(dǎo)致get在并發(fā)的情況下可能取不到值。1、一個(gè)線程在get的時(shí)候,另一個(gè)線程在對同一個(gè)key的node進(jìn)行remove操作;2、一個(gè)線程在get的時(shí)候,另一個(gè)線程正則重排table??赡軐?dǎo)致舊table取不到值。 那么本質(zhì)是,我在get的時(shí)候,有其他線程在對同一桶的鏈表或樹進(jìn)行修改。那么get是怎么保證同步性的呢?我們看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt到底是干嘛的:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}它是對tab[i]進(jìn)行原子性的讀取,因?yàn)槲覀冎纏utVal等對table的桶操作是有加鎖的,那么一般情況下我們對桶的讀也是要加鎖的,但是我們這邊為什么不需要加鎖呢?因?yàn)槲覀冇昧薝nsafe的getObjectVolatile,因?yàn)閠able是volatile類型,所以對tab[i]的原子請求也是可見的。因?yàn)槿绻秸_的情況下,根據(jù)happens-before原則,對volatile域的寫入操作happens-before于每一個(gè)后續(xù)對同一域的讀操作。所以不管其他線程對table鏈表或樹的修改,都對get讀取可見。
深入淺出ConcurrentHashMap(1.8) 作者 占小狼
探索jdk8之ConcurrentHashMap 的實(shí)現(xiàn)機(jī)制 作者 淮左
java并發(fā)編程總結(jié)4——ConcurrentHashMap在jdk1.8中的改進(jìn) 作者 everSeeker
新聞熱點(diǎn)
疑難解答