廢話就不多說了,直入正題。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable
一、基本性質1、支持完全并發(fā)的讀操作和可控并發(fā)級別的寫操作的Map,方法的基本功能與Hashtable大致相同,但是使用鎖分段技術,沒有使用全局鎖,整體上更有效率,能夠在大多數(shù)并發(fā)場景中替代Hashtable。2、不允許null key和null value(1.7之前的代碼中,指出非常特殊情況可能會讀到null value,但是在jsr133后的新內(nèi)存模型中是不會發(fā)生的,也就是實際中不會讀到null value,1.7把這個無用的代碼去掉了)。3、專為并發(fā)設計的集合類。它的讀操作不加鎖(null value的問題不說了),可以和寫操作并行,但是讀操作只反映最近完成的結果,也可能和它開始執(zhí)行的那一瞬間的狀態(tài)有出入,這在一般的并發(fā)場景中可以接受。基本的寫操作加鎖,putAll和clear這兩個批量操作沒有使用全局鎖,因此并不保證結果和預期的一致(比如clear執(zhí)行后的一瞬間,可能map中會有其他線程添加的數(shù)據(jù))。它還不是真正意義上的線程安全,記住線程安全的關鍵:線程一個個串行,和多線程并發(fā)的執(zhí)行,產(chǎn)生的效果(對外界可見的影響)一樣。很明顯ConcurrentHashMap達不到這種程度。它的這種并發(fā)控制更像是數(shù)據(jù)庫隔離級別。它沒有全局鎖(數(shù)據(jù)庫進程鎖/庫鎖),使用的是Segment的鎖(數(shù)據(jù)庫表級別的鎖)。根據(jù)寫阻塞寫不阻塞讀這一點,可以認為它具體的隔離級別是"Read uncommitted"這個級別,即不會丟失寫操作帶來的影響,但是會讀取到"中間態(tài)數(shù)據(jù)/臟數(shù)據(jù)"(寫操作方法還沒有返回,就能看到寫操作帶來的部分影響)。至于”不可重復讀“、”幻讀“這兩個,根據(jù)具體實現(xiàn),Segment的讀方法都只讀一次(加鎖讀不管),可以認為是沒有的。“臟數(shù)據(jù)”這個,可以認為只在putAll、clear中才會出現(xiàn)。基本的put/remove/replace,因為都只操作一個K-V,因此可以認為它們的執(zhí)行過程中是沒有“臟數(shù)據(jù)”出現(xiàn)的。ConcurrentHashMap在多線程之間可能會存在不一致狀態(tài)。但是針對簡單的讀寫操作來說,這些不一致的狀態(tài),要么是歷史的一致狀態(tài),要么是未來的一致狀態(tài),不會出現(xiàn)錯誤的狀態(tài)(HashMap多線程操作就會出現(xiàn)錯誤狀態(tài),比如死循環(huán)、更新丟失)。在很多程序中還是能接受這種不一致的,當然,如果你需要更高的一致性,比如獲取需要同時獲取ConcurrentHashMap中某幾個key在某個絕對的同一時刻對應的value,那么它自帶的方法就不能完成了,這時候就要依賴更外部的范圍更大鎖。這種差不多就算是通常所說的事務了,api是基本不提供自帶事務性的方法的,需要你自己控制。4、弱一致性。第3點中一起說了。5、ConcurrentHashMap的迭代器是弱一致性的,不是fail-fast快速失敗的。即使它在迭代時發(fā)現(xiàn)了并發(fā)修改,它也是會繼續(xù)執(zhí)行下去的,它不會拋出ConcurrentModificationException,因此它的迭代器可能會反映出迭代過程中的并發(fā)修改,預期迭代結果可能和調用的瞬間情況有出入。雖然迭代器在并發(fā)時不會拋出異常,但是仍然不建議多線程并發(fā)使用此迭代器。6、繼承了HashMap的一些性質。7、不支持clone,不過clone本身用得就很少,用Map拷貝構造創(chuàng)建一個實例能夠做到一樣的效果。基本結構的簡單示意圖如下(圓形中的數(shù)字是亂寫的,非真實數(shù)據(jù))。
// 默認初始化容量,這個容量指的是所有Segment中的hash桶的數(shù)量和static final int DEFAULT_INITIAL_CAPACITY = 16;// 每個Segment的默認的加載因子static final float DEFAULT_LOAD_FACTOR = 0.75f;// 整個Map的默認的并發(fā)級別,代表最大允許16個線程同時進行并發(fā)修改操作。實際并發(fā)級別要是 2^n 這種數(shù),同時這個變量也是數(shù)組segments的長度static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 每個Segment的最大hash桶的大小(數(shù)組Segment.table的最大長度),初始容量也不能大于此值static final int MAXIMUM_CAPACITY = 1 << 30;// 數(shù)組segments的最大長度,也是最大并發(fā)級別static final int MAX_SEGMENTS = 1 << 16; // slightly conservative,翻譯過來是略保守,實際遠遠用不到這么多Segment啊// size方法和containValue方法最大的不加鎖嘗試次數(shù)// 簡單點說就是先不加鎖執(zhí)行一次,如果沒發(fā)現(xiàn)改變,就返回,發(fā)現(xiàn)改變就再重復一次,再發(fā)現(xiàn)改變就所有對segment都加鎖再操作一次,這樣設計主要是為了避免加鎖提高效率。static final int RETRIES_BEFORE_LOCK = 2;2、變量// 段掩碼,跟子網(wǎng)掩碼以及HashMap中的 table.length - 1 的作用差不多,都是用為了用位運算加速hash散列定位final int segmentMask;// 段偏移量,定位到一個segment使用的是hash值的高位,segments數(shù)組長度為2^n的話,此數(shù)值為32-n,也就是把最高的n位移動到最低的n位final int segmentShift;// 段數(shù)組final Segment<K,V>[] segments;// 下面都懂transient Set<K> keySet;transient Set<Map.Entry<K,V>> entrySet;transient Collection<V> values;三、基本類
也就是HashEntry和Segments,這兩個是所有方法實現(xiàn)的基礎。先看看HashEntry,代碼如下:static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } @SupPRessWarnings("unchecked") static final <K,V> HashEntry<K,V>[] newArray(int i) { return new HashEntry[i]; }}HashEntry相對HashMap.Entry的變化:1、value變?yōu)関olatile了,對多線程具有相同的可見性,直接讀取value、給value賦值為一個已經(jīng)知道的值(就是寫value可以不依賴它當前的值),這兩個操作是是不用加鎖的;2、next變?yōu)閒inal了,這也就是說不能從HashEntry鏈的中間或尾部添加或刪除節(jié)點,因為這需要修改 next 引用值。這讓刪除變得復雜了一些,要復制前面的節(jié)點并重新添加,刪除的效率變低。說兩個問題:1、為什么使用final修飾next?key和hash因為是判別兩個hashEntry是否“相等”的重要屬性,實際中不允許修改,設置為final合情合理,更重要的原因是擴容時有用到hash值不會改變這個重要的前提。擴容時一部分節(jié)點重用,本質上和HashMap1.8中的高低位時一個思想。next指針在刪除節(jié)點時需要被修改,設置為final使得remove操作很麻煩。這個不是為了remove不影響正在進行在HashEntry鏈的讀操作,雖然這個類是弱一致的,但是是允許迭代讀時看到其他線程的修改的,再對比看下1.7的,可以印證這一點。關于這一點,需要知道Java中final的內(nèi)存語義。關于final的內(nèi)存語義,可以看下這篇文章,我這里結合HashEntry簡單說下。final的內(nèi)存語義,能夠保證讀線程讀去一個HashEntry節(jié)點時,它的next指針在這之前已經(jīng)被正確初始化了(構造方法中this不提前逸出的情況下,HashEntry滿足這一點)。如果使用普通的變量next,處理器重排序后,有可能讀線程先讀取到這個節(jié)點引用,然后通過引用讀取next的值。next的初始化賦值因為重排序,可能被放在后面再執(zhí)行,這種情況下讀取到的next是默認值null。因為get/put/remove/replace都需要使用next指針進行鏈表遍歷,在遍歷鏈表時,next == null本身就是一個正常的狀態(tài) ,因此這種重排序基本上對所有讀寫操作都有嚴重的影響,會造成混淆,所有方法的準確性大大降低。然后,因為next == null是正常狀態(tài),不能像value一樣,在讀取到null時使用readUnderLock再來一次(這也是ConcurrentHashMap為什么不能允許null value的一個原因)。為了避免上面說的這種問題,把next設置為final的,這樣就能保證不會讀取到未被初始化賦值時的默認值null,遍歷時讀到null就一定能保證是遍歷到了鏈表末尾。2、jdk1.7開始不再使用final next,改為使用volatile next。
這樣在jdk1.5(jsr133內(nèi)存模型改動)之后也是也是可以的,能夠避免讀取到未被初始化賦值的next。可以參考下這篇講volatile的和這篇講雙檢鎖單例的。這些代碼是1.5時寫的,1.6基本沒變。在寫這段代碼時作者對內(nèi)存模型理解有些偏差(看下這里),所以沒有使用volatile。1.7開始就改為volatile next,并使用Unsafe.putOrderedObject來降低volatile的寫開銷,盡量提升性能。因為理解上的偏差,所以才有了readUnderLock方法。readUnderLock是在讀取到value == null時重新加鎖讀一次,避免讀取到的value是未執(zhí)行初始化賦值的默認值null。在jsr133后的新的內(nèi)存模型中,讀取到value == null是不可能的發(fā)生的。因此1.7開始,這個設計直接去掉了,因為本身就不允許null存進來,所以不會讀取到,也談不上加鎖再讀一次。這個問題本身是1.5中的,但是1.6的代碼基本沒實質性改動,繼承了這個問題,1.7的改進了不少,修復了這個問題。另外再扯下,為什么不允許null value?value好像并不影響Map的特性,網(wǎng)上簡單找了下,大概四個原因:1、最初設計中,null value可以區(qū)別是否要加鎖讀,如果允許null value,那么不好區(qū)別要不要加鎖,現(xiàn)在這已經(jīng)不需要了;2、containsKey方法的有些實現(xiàn)是 get(key) != null,這里也需要區(qū)別出 null value,但是可以不這樣實現(xiàn);3、為了完全替換Hashtable,這個類是不允許null value的;4、歷史原因,最開始是null value,后面的版本也不好改了,就保留下來了。再看下Segment,很多要說的都在代碼上寫注釋了,有些注釋只是簡單的翻譯下,各位還是要自己看下源碼。
個人覺得一些要注意的點:1、Segment本質上還是一個hash表,很多操作跟HashMap很像,所以有HahsMap的基礎,理解這些代碼就很簡單了。ConcurrentHashMap的操都是由segment對應的操作加上一些額外策略實現(xiàn)的。2、直接讀取操作是不加鎖的,讀取null值才加鎖讀一次。這一點上面講了,新內(nèi)存模型中已經(jīng)不會發(fā)生了,jdk1.7中加鎖讀也不要了,get在任何情況下都不加鎖,所以這一點可以不管了。3、寫操作replace/remove/put/clear都需要加鎖,擴容rehash操作只在put內(nèi)部的調用,所以也是隱含加鎖的,寫操作要聯(lián)系HashEntry的性質(final next)來看。4、擴容和HashMap有些細小的區(qū)別,這個可以看下面寫的注釋。// Segment是一個特殊的hash表,寫上繼承ReentrantLock只是只是為了簡化一些鎖定,避免單獨new一個ReentrantLock。static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; // 類似size,外部修改Segment結構的都會修改這個變量,它是volatile的,幾個讀方法在一開始就判斷這個值是否為0,能盡量保證不做無用的讀取操作。 transient volatile int count; // 跟集合類中的modCount一樣,檢測到這個數(shù)值變化說明Segment一定有被修改過 // 因為modCount不是volatile,有可能無法反映出一次修改操作的中間狀態(tài)(歷史的一致狀態(tài)/未來的一致狀態(tài)), // 這些狀態(tài)本身在當時就不應該被看見,看見了也沒事,所以使用普通變量也合理 transient int modCount; // 擴容閾值 transient int threshold; // 類似HashMap的table數(shù)組 transient volatile HashEntry<K,V>[] table; // 加載因子,所有segment都是一樣的,放在這里就不需要依賴外部類了 final float loadFactor; Segment(int initialCapacity, float lf) { loadFactor = lf; setTable(HashEntry.<K,V>newArray(initialCapacity)); } @SuppressWarnings("unchecked") static final <K,V> Segment<K,V>[] newArray(int i) { return new Segment[i]; } // 只有持有本segment的鎖或者是構造方法中才能調用這個方法,反序列化構造時也有用這個 void setTable(HashEntry<K,V>[] newTable) { threshold = (int)(newTable.length * loadFactor); table = newTable; } // 跟indexFor算法一樣 HashEntry<K,V> getFirst(int hash) { HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1)]; } // 加鎖讀取value,在直接讀取value得到null時調用 // 源碼這里有英文注釋:讀到value為null,只有當某種重排序的HashEntry初始化代碼讓volatile變量初始化重排序到構造方法外面時才會出現(xiàn), // 這一點舊的內(nèi)存模型下是合法的,但是不知道會不會發(fā)生。 // 具體怎么發(fā)生,就是節(jié)點的構造方法執(zhí)行完了,但是value的賦值重排序到構造方法外面,然后節(jié)點被引用了,掛載成了HashEntry鏈第一個節(jié)點, // 此時可以讀取到這個節(jié)點,但是value的賦值不一定被執(zhí)行了,value是默認值null // 搜下stackoverflow,說在新內(nèi)存模型下已經(jīng)不會發(fā)生了,是作者自己理解有些問題,后續(xù)的1.7也修復了 // http://stackoverflow.com/questions/5002428/concurrenthashmap-reorder-instruction/5144515#5144515 V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } } // 下面的方法是為了實現(xiàn)map中的方法,名字都很像 // 直接讀value是不用加鎖的,碰到讀到value == null,才加鎖再讀一次,這個前面說了,后面的也一樣 V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; } boolean containsKey(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) // ConcurrentHashMap禁止添加null key,所以這里不用考慮key為null的情況,下同 return true; e = e.next; } } return false; } // 跟get差不多 boolean containsValue(Object value) { if (count != 0) { // read-volatile HashEntry<K,V>[] tab = table; int len = tab.length; for (int i = 0 ; i < len; i++) { for (HashEntry<K,V> e = tab[i]; e != null; e = e.next) { V v = e.value; if (v == null) // recheck v = readValueUnderLock(e); if (value.equals(v)) return true; } } } return false; } // 幾個寫操作,需要加鎖,下同 boolean replace(K key, int hash, V oldValue, V newValue) { lock(); try { HashEntry<K,V> e = getFirst(hash); while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; boolean replaced = false; if (e != null && oldValue.equals(e.value)) { // replace不會修改modCount,這降低了containValue方法的準確性,jdk1.7修復了這一點 replaced = true; e.value = newValue; } return replaced; } finally { unlock(); } } V replace(K key, int hash, V newValue) { lock(); try { HashEntry<K,V> e = getFirst(hash); while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { // replace不會修改modCount,這降低了containValue方法的準確性,jdk1.7修復了這一點 oldValue = e.value; e.value = newValue; } return oldValue; } finally { unlock(); } } // 基本思路跟1.6的HashMap一樣 V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; // 先執(zhí)行擴容,再添加節(jié)點,1.6的HashMap是先添加節(jié)點,再擴容 // 并且Segment這里是先用大于號判斷大小,再count++,擴容時實際容量會比HashMap中同情況時多一個, // 會出現(xiàn)put完成后Segment.count > threshold應該擴容但是卻沒有擴容的情況,這不太符合設計 // eg:threshold = 12,那么在這個Segment上執(zhí)行第13次put時,判斷語句為 12++ > 12,為false,執(zhí)行完成后Segment.count = 13,threshold = 12 // 另外默認構造情況下threshold = 0,而且每個Segment的table.length都為1,如果更改為符合設計的思路的擴容方式,第一次put又一定會觸發(fā)擴容 // 1.7修改了上面兩點,判斷語句先讓count加1,再用大于號執(zhí)行判斷,同時讓table.length 最小值為2,這樣第一次put也不會擴容,完善了整體的擴容機制 if (c++ > threshold) // ensure capacity rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); // 定位方式和1.6的HashMap一樣 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) // put相同的key(相當于replace)不會修改modCount,這降低了containsValue方法的準確性,jdk1.7修復了這一點 e.value = value; } else { oldValue = null; ++modCount; // 也是添加在HashEntry鏈的頭部,前面說了,這里的HashEntry的next指針是final的,new后就不能變 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; } return oldValue; } finally { unlock(); } } // 擴容時為了不影響正在進行的讀線程,最好的方式是全部節(jié)點復制一次并重新添加 // 這里根據(jù)擴容時節(jié)點遷移的性質,最大可能的重用一部分節(jié)點,這個性質跟1.8的HashMap中的高低位是一個道理,必須要求hash值是final的 void rehash() { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity >= MAXIMUM_CAPACITY) return; // 這里有段注釋是說下面的算法及其作用的,跟1.8的HashMap的resize中用到的那個高低溫的原理一樣: // 擴容前在一個hash桶中的節(jié)點,擴容后只會有兩個去向。這里是用 &,后續(xù)改為用高低位,實質上是一樣的。 // 根據(jù)這個去向,找到最末尾的去向都一樣的連續(xù)的一部分,這部分可以重用,不需要復制 // HashEntry的next是final的,resize/rehash時需要重新new,這里的特殊之處就是最大程度重用HashEntry鏈尾部的一部分,盡量減少重新new的次數(shù) // 作者說從統(tǒng)計角度看,默認設置下有大約1/6的節(jié)點需要被重新復制一次,所以通常情況還是能節(jié)省不少時間的 HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1); threshold = (int)(newTable.length * loadFactor); // 跟1.6的HashMap有一樣的小問題,可能會過早變?yōu)镮nteger.MAX_VALUE從而導致后續(xù)永遠不能擴容 int sizeMask = newTable.length - 1; for (int i = 0; i < oldCapacity ; i++) { // We need to guarantee that any existing reads of old Map can proceed. So we cannot yet null out each bin. // 為了保證其他線程能夠繼續(xù)執(zhí)行讀操作,不執(zhí)行手動將原來table賦值為null,只是再最后修改一次table的引用 // 1.6的HashMap對應的地方有個src[j] = null,這句話在多線程時某些程度上能加快回收舊table數(shù)組,但是放在這里會影響其他線程的讀操作 // 只要其他線程完成了讀操作,就不會再引用舊HashEntry,舊的就會自動被垃圾回收器回收 // 下面有個關于resize中重用節(jié)點的示意圖,可以看下 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; // Single node on list if (next == null) newTable[idx] = e; else { // Reuse trailing consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; // 這個循環(huán)是尋找HashEntry鏈最大的可重用的尾部 // 看過1.8的HashMap就知道,如果hash值是final的,那么每次擴容,擴容前在一條鏈表上的節(jié)點,擴容后只會有兩個去向 // 這里重用部分中,所有節(jié)點的去向相同,它們可以不用被復制 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 把重用部分整體放在擴容后的hash桶中 // 復制不能重用的部分,并把它們插入到rehash后的所在HashEntry鏈的頭部 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { int k = p.hash & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(p.key, p.hash, n, p.value); } // 這里也可以看出,重用部分rehash后相對順序不變,并且還是在rehash后的鏈表的尾部 // 不能重用那些節(jié)點在rehash后,再一個個重頭添加到鏈表的頭部,如果還在一條鏈表上面,那么不能重用節(jié)點的相對順序會顛倒 // 所以ConcurrentHashMap的迭代順序會變化,本身它也不保證迭代順序不變 } } } table = newTable; } // 因為1.6的HashEntry的next指針是final的,所以比普通的鏈表remove要復雜些,只有被刪除節(jié)點的后面可以被重用,前面的都要再重新insert一次 V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay in list, but all preceding ones need to be cloned. // 因為next指針是final的,所以刪除不能用簡單的鏈表刪除,需要把前面的節(jié)點都重新復制再插入一次,后面的節(jié)點可以重用 // 刪除后,后面的可以重用的那部分順序不變且還是放在最后,前面的被復制的那部分順序顛倒地放在前面 // 后面Map.remove那里有個remove的示意圖 ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } } void clear() { if (count != 0) { lock(); try { HashEntry<K,V>[] tab = table; for (int i = 0; i < tab.length ; i++) tab[i] = null; ++modCount; count = 0; // write-volatile } finally { unlock(); } } }}下面是jdk1.6 ConcurrentHashMap Segment resize中重用節(jié)點的示意圖。
四、構造方法
這里就很簡單了,看下代碼和注釋就行。public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 尋找適合的2^n,以及n,默認構造時是16和4 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; // 定位Segment的index時hash值的移位,為32 - n,默認構造時是28 segmentMask = ssize - 1; // 用于 & 運算定位Segment的index,默認構造時是0x0f this.segments = Segment.newArray(ssize); // 實際的concurrentLevel就是Segment數(shù)組的長度 if (initialCapacity > MAXIMUM_CAPACITY) // 雖然整個Map的capacity可以大于MAXIMUM_CAPACITY,但是初始化時為了滿足2^n的要求,最大也只允許這么多 initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; // 上面這段,求每個Segment代表的實際的hash表的table數(shù)組的長度(專業(yè)點叫hash桶的數(shù)量),這個值是cap,默認構造時是1 // 參數(shù)initailCapacity是一個容量參考值,用來計算每個Segment在初始化時有多少個hash桶(這個桶的數(shù)量要符合2^n) // 整個ConcurrentHashMap的容量capacity是所有Segment的容量和,在初始化時每個Segment都一樣, // 但是每個Segment都可以單獨擴容,所以構造完成后這個capacity值就沒啥用了 for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment< K,V > (cap, loadFactor);}public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m);}五、一些基礎方法
有了HashMap作為基礎,這里就簡單在注釋上說下。// Wang/Jenkins hash的變種算法// 因為算出來的hash的最高的幾位用于定位段,最低的幾位用于段內(nèi)定位hash桶,所以高位低位都需要擾動,要兩個方向移位// HashMap中算出來的hash值只有最低的幾位會被用到,所以只對需要對低位進行操作,一個方向移位就行 ,不需要下面這種更復雜的hash函數(shù)private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-Word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16);}// 定位到某個具體的Segment,使用hash值的高位,算法和HashMap的indexFor一樣// ConcurrentHashMap中沒有抽象出indexFor了,實際實現(xiàn)還是一樣的,看Segment的代碼final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask];}這里可以看到,ConcurrentHashMap的有兩種hash散列定位,分別使用的是hash值中不同的地方,所以理論上ConcurrentHashMap的沖突要比同參數(shù)的HashMap要少。因為ConcurrentHashMap每個Segment都可以單獨擴容,擴容方法移動到Segment里面去了,Segment.rehash就是擴容方法。六、常用方法1、讀方法比較簡單,核心思路是代理到對應的Segment去做相應的操作,要批量讀取所有Sement的方法特殊處理下。// ConcurrentHashMap.get是通過hash值定位到Segment,有這個Segment的get來完成的public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash);}// 同getpublic boolean containsKey(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).containsKey(key, hash);}// containsValue和size雖然是讀操作,但是會批量讀取到所有Segment,所以特殊處理// 先不加鎖嘗試兩次以獲得比較近似的結果,如果contains就直接返回(因為有一個存在就是存在,不存在才需要遍歷全部),不contains才繼續(xù),如果發(fā)現(xiàn)modCount被其他線程修改,就全部加鎖再執(zhí)行// 不加鎖讀兩次時,可能會碰見寫操作的中間狀態(tài),也可能在循環(huán)到后面時有線程修改了前面,所以這個方法不是100%準確的// 設計成這樣主要是為了提高效率,很多業(yè)務還是可以接受這種誤差,需要更強一致性的時候,可以自己寫個方法// 上面Segment的分析中指出了,put相同的key、replace方法不會修改modCount,但是會改變value,這一點使得后面檢測modCount是否改變可能成為無用功,讓containsValue方法的準確性降低了,1.7進行了修復public boolean containsValue(Object value) { if (value == null) throw new NullPointerException(); final Segment<K,V>[] segments = this.segments; int[] mc = new int[segments.length]; // 先不加鎖執(zhí)行RETRIES_BEFORE_LOCK = 2次 for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { int sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { int c = segments[i].count; // 這個c沒哪里用,意義不明 mcsum += mc[i] = segments[i].modCount; // 就是 mc[i] = segments[i].count; mssum += mc[i],臨時保存一份modCount if (segments[i].containsValue(value)) // 碰見contains直接return return true; } boolean cleanSweep = true; // mcsum是modCount的和,為0可以認為遍歷開始時沒有任何put完成過任何HashEntry,即方法開始執(zhí)行時不包含任何HashEntry,可以認為(近似認為,幾率比較大)此時也不包含 if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { int c = segments[i].count; if (mc[i] != segments[i].modCount) { // modCount改變,說明有其他線程修改了Segment的結構,退出循環(huán)。會有replace的問題,前面說了 cleanSweep = false; break; } } } if (cleanSweep) return false; } // 如果連續(xù)兩次都碰見modCount改變的情況,這時候一次性對全部Segment加鎖,最大程度保證遍歷時的一致性 // 因為是全部加鎖后再遍歷,遍歷開始后沒有線程可以修改任何Segment的結構,可以保證當前線程得到的是準確值 for (int i = 0; i < segments.length; ++i) segments[i].lock(); boolean found = false; try { for (int i = 0; i < segments.length; ++i) { if (segments[i].containsValue(value)) { found = true; break; } } } finally { for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } return found;}// 不是Map接口的方法,為了兼容Hashtable,等價于containsValuepublic boolean contains(Object value) { return containsValue( value);}// 基本同containValue,但是只執(zhí)行一次且不會加鎖public boolean isEmpty() { final Segment<K,V>[] segments = this.segments; int[] mc = new int[segments.length]; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { if (segments[i].count != 0) return false; else mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { if (segments[i].count != 0 || mc[i] != segments[i].modCount) return false; } } return true;}// 跟containsValue差不多,但是size不會受put相同的key、replace方法的影響// 注意最后一個int溢出處理,因為HashMap以及ConcurrentHashMap是個特殊的集合類,我們通常所說的容量是hash桶的數(shù)目,這并不是實際容量// 因為使用鏈表解決hash沖突的原因,實際的可以容納得更多,可能會遠遠超多Integer.MAX_VALUE,這這時返回值就是個錯誤的值,但還是盡量返回了一個“比較有用”的值。// 這純粹是歷史原因造成的坑,返回個int,沒考慮實際情況,1.8的新增了一個mappingCount方法,返回long型準確數(shù)字public int size() { final Segment<K,V>[] segments = this.segments; long sum = 0; long check = 0; int[] mc = new int[segments.length]; // 不加鎖執(zhí)行兩次,如果兩次數(shù)據(jù)不一樣,或者碰到modCount++被修改了,就全部加鎖在執(zhí)行一次 for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1; // force retry break; } } } if (check == sum) break; } if (check != sum) { sum = 0; for (int i = 0; i < segments.length; ++i) segments[i].lock(); for (int i = 0; i < segments.length; ++i) sum += segments[i].count; for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) // int溢出處理,因此返回值可能會是錯誤的。 // 并且因為兼容性的原因,這個還無法解決,只能新增一個方法,1.8的ConcurrentHashMap就是新增了一個返回long型的方法 return Integer.MAX_VALUE; else return (int)sum;}2、寫方法基本思路還是通過hash定位到Segment,由Segment完成。注意下putAll和clear這兩個批量操作。// ConcurrentHashMap.put是通過hash值定位到Segment,有這個Segment的put來完成的// ConcurrentHashMap的實現(xiàn)中不允許null key和null valuepublic V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); // 這里 null key 會拋出NPE return segmentFor(hash).put(key, hash, value, false);}// 下面幾個基本思路同put,都是代理給相應的Segment的對應方法進行操作public V putIfAbsent(K key, V value);public V remove(Object key);public boolean remove(Object key, Object value);public boolean replace(K key, V oldValue, V newValue);public V replace(K key, V value);// putAll和clear都是循環(huán)操作,沒有全局加鎖,在執(zhí)行期間還是可以執(zhí)行完成其他的寫操作的,事務性比較差的方法,設計成不用全局鎖是為了提高效率public void putAll(Map<? extends K, ? extends V> m) { for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) put(e.getKey(), e.getValue());}public void clear() { for (int i = 0; i < segments.length; ++i) segments[i].clear();}put的簡單個示意圖如下(圓形中的數(shù)字是亂寫的,非真實數(shù)據(jù))。remove的簡單示意圖如下(圓形中的數(shù)字是亂寫的,非真實數(shù)據(jù))。
七、視圖以及迭代器
視圖指的的 keySet()、values()、entrySet()、keys()、elements()這幾個方法,三個視圖屬性實質上也是用這幾個方法。視圖的操作都是基本上代理給map本身來進行操作,因此視圖中進行寫操作(一般是remove,沒實現(xiàn)add),也會造成map發(fā)生更改。特別注意下values的類型就是Collection,因此操作都很原始,沒有任何子類的特性可以使用,平時盡量也不要用這個。迭代器不使用沒有全局加鎖,沒有使用modCount機制,允許迭代時有其他線程并發(fā)進行寫操作,因此ConcurrentHashMap的迭代器是弱一致的,也不是快速失敗的。它永遠不會拋出ConcurrentModificationException,也就是迭代過程允許Map被修改,這些修改可以(但是不保證絕對可以)實時地反映在當前的迭代結果中。跟HashMap的迭代器有些區(qū)別,CoincurrentHashMap的迭代器是”逆序“的,也就是在segments、Segment.table中都是從大下標開始迭代。這個應該是為了降低迭代與寫操作的沖突,但是感覺作用并不明顯啊。1.8的ConcurrentHashMap中迭代和擴容就是逆序的,但是1.8的這一點很明顯能降低迭代操作與寫操作沖突。1.6的這里感覺最多是為了clear()、putAll之類的沖突減少。另外,由于實際的Map.Entry,也就是內(nèi)部類HashEntry中沒有public get/set方法,返回的EntrySet無法被使用,這里多了一個簡單的封裝類WriteThroughEntry entends Abstract.SimpleEntry,用于封裝外部在迭代操作對Entry的寫操作。HashMap中的HashMap.Entry本身就有public get/set方法,所以HashMap的EntrySet直接返回HashMap.Entry類型的Set就行,不必再封裝一次。final class WriteThroughEntry extends AbstractMap.SimpleEntry<K,V>{ WriteThroughEntry(K k, V v) { super(k,v); } public V setValue(V value) { if (value == null) throw new NullPointerException(); V v = super.setValue(value); ConcurrentHashMap.this.put(getKey(), value); return v; }}特別的一點,這里會進行一次put操作,保證value所在的hashEntry在當時是一定存在的。這里把setValue 看成是 put相同的key,如果此時value不存在,也會添加一個進去。另外,由于讀操作不加鎖,因此value返回的也是一個過期的值,ConcurrentHashMap本身也保證不了更多。1.6的代碼跟基本1.5的一樣,都是比較古老的代碼了。之后的1.7/1.8都進行不少優(yōu)化,實現(xiàn)上也變化了很多,變得更加復雜了。學習1.6的很重要的一點,就是它把ConcurrentHashMap的基本的原理、分段鎖思想、各個方法的基本實現(xiàn),講得足夠清楚、簡單,這一點很重要。現(xiàn)在各種編程語言的使用成本在降低,但是相對的學習成本變高了,為了更加強大、實用,許多標準api都進行了各種細節(jié)上的優(yōu)化,代碼不再簡單(比如1.6的這個類的源碼總共1282行,到了1.8變成了6313行,有大概40%的是函數(shù)式、Stream相關的,也就是基本的功能用了大約3倍的代碼量來實現(xiàn)),想弄清楚一個基本的思路都得看半天。學習最好是要從簡單、基礎的東西開始,1.6的源碼也是集合類中的基礎,看懂了,理解后續(xù)版本就會簡單了。下一篇說下1.7的ConcurrentHashMap,它相對1.6來說,改動不算特別大。以上內(nèi)容,如有問題,煩請指出,謝謝!
新聞熱點
疑難解答