併發程式設計(十六)——java7 深入併發包 ConcurrentHashMap 原始碼解析
以前寫過介紹HashMap的文章,文中提到過HashMap在put的時候,插入的元素超過了容量(由負載因子決定)的範圍就會觸發擴容操作,就是rehash,這個會重新將原陣列的內容重新hash到新的擴容陣列中,在多執行緒的環境下,存在同時其他的元素也在進行put操作,如果hash值相同,可能出現同時在同一陣列下用連結串列表示,造成閉環,導致在get時會出現死迴圈,所以HashMap是執行緒不安全的。
JDK1.7的實現
整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會將其描述為分段鎖。注意,行文中,我很多地方用了“槽”來代表一個 segment。
簡單理解就是,ConcurrentHashMap 是一個 Segment 陣列,Segment 通過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是執行緒安全的,也就實現了全域性的執行緒安全。
concurrencyLevel:並行級別、併發數、Segment 數。預設是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支援 16 個執行緒併發寫,只要它們的操作分別分佈在不同的 Segment 上。這個值可以在初始化的時候設定為其他值,但是一旦初始化以後,它是不可以擴容的。
再具體到每個 Segment 內部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證執行緒安全,所以處理起來要麻煩些。
初始化
initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。
loadFactor:負載因子,之前我們說了,Segment 陣列不可以擴容,所以這個負載因子是給每個 Segment 內部使用的。
1 public ConcurrentHashMap(int initialCapacity, 2float loadFactor, int concurrencyLevel) { 3if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 4throw new IllegalArgumentException(); 5if (concurrencyLevel > MAX_SEGMENTS) 6concurrencyLevel = MAX_SEGMENTS; 7// Find power-of-two sizes best matching arguments 8int sshift = 0; 9int ssize = 1; 10// 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方 11while (ssize < concurrencyLevel) { 12++sshift; 13ssize <<= 1; 14} 15// 我們這裡先不要那麼燒腦,用預設值,concurrencyLevel 為 16,sshift 為 4 16// 那麼計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值 17this.segmentShift = 32 - sshift; 18this.segmentMask = ssize - 1; 19 20if (initialCapacity > MAXIMUM_CAPACITY) 21initialCapacity = MAXIMUM_CAPACITY; 22 23// initialCapacity 是設定整個 map 初始的大小, 24// 這裡根據 initialCapacity 計算 Segment 陣列中每個位置可以分到的大小 25// 如 initialCapacity 為 64,那麼每個 Segment 或稱之為"槽"可以分到 4 個 26int c = initialCapacity / ssize; 27if (c * ssize < initialCapacity) 28++c; 29// 預設 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上, 30// 插入一個元素不至於擴容,插入第二個的時候才會擴容 31int cap = MIN_SEGMENT_TABLE_CAPACITY; 32while (cap < c) 33cap <<= 1; 34 35// 建立 Segment 陣列, 36// 並建立陣列的第一個元素 segment[0] 37Segment<K,V> s0 = 38new Segment<K,V>(loadFactor, (int)(cap * loadFactor), 39(HashEntry<K,V>[])new HashEntry[cap]); 40Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; 41// 往陣列寫入 segment[0] 42UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] 43this.segments = ss; 44 }
初始化完成,我們得到了一個 Segment 陣列。
我們就當是用 new ConcurrentHashMap() 無參建構函式進行初始化的,那麼初始化完成後:
- Segment 陣列長度為 16,不可以擴容
- Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
- 這裡初始化了 segment[0],其他位置還是 null,至於為什麼要初始化 segment[0],後面的程式碼會介紹
- 當前 segmentShift 的值為 32 - 4 = 28,segmentMask 為 16 - 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到
Segment
1 static class Segment<K,V> extends ReentrantLock implements Serializable { 2 3transient volatile HashEntry<K,V>[] table; 4 5transient int count; 6 7transient int modCount; 8 9 }
從上Segment的繼承體系可以看出,Segment實現了ReentrantLock,也就帶有鎖的功能,table使用volatile修飾,保證了記憶體可見性。
put 過程分析
我們先看 put 的主流程,對於其中的一些關鍵細節操作,後面會進行詳細介紹。
1 public V put(K key, V value) { 2Segment<K,V> s; 3if (value == null) 4throw new NullPointerException(); 5// 1. 計算 key 的 hash 值 6int hash = hash(key); 7// 2. 根據 hash 值找到 Segment 陣列中的位置 j 8//hash 是 32 位,無符號右移 segmentShift(28) 位,剩下高 4 位, 9//然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的高 4 位,也就是槽的陣列下標 10int j = (hash >>> segmentShift) & segmentMask; 11// 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null, 12// ensureSegment(j) 對 segment[j] 進行初始化 13if ((s = (Segment<K,V>)UNSAFE.getObject// nonvolatile; recheck 14(segments, (j << SSHIFT) + SBASE)) == null) //in ensureSegment 15s = ensureSegment(j); 16// 3. 插入新值到 槽 s 中 17return s.put(key, hash, value, false); 18 }
初始化槽: ensureSegment
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。
這裡需要考慮併發,因為很可能會有多個執行緒同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。
1 private Segment<K,V> ensureSegment(int k) { 2final Segment<K,V>[] ss = this.segments; 3long u = (k << SSHIFT) + SBASE; // raw offset 4Segment<K,V> seg; 5if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { 6// 這裡看到為什麼之前要初始化 segment[0] 了, 7// 使用當前 segment[0] 處的陣列長度和負載因子來初始化 segment[k] 8// 為什麼要用“當前”,因為 segment[0] 可能早就擴容過了 9Segment<K,V> proto = ss[0]; 10int cap = proto.table.length; 11float lf = proto.loadFactor; 12int threshold = (int)(cap * lf); 13 14// 初始化 segment[k] 內部的陣列 15HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; 16if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) 17== null) { // 再次檢查一遍該槽是否被其他執行緒初始化了。 18 19Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); 20// 使用 while 迴圈,內部用 CAS,當前執行緒成功設值或其他執行緒成功設值後,退出,如果其他執行緒成功設定後,這裡獲取到直接返回 21while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) 22== null) { 23if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) 24break; 25} 26} 27} 28return seg; 29 }
總的來說,ensureSegment(int k) 比較簡單,對於併發操作使用 CAS 進行控制。
第一層很簡單,根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作了。
Segment 內部是由陣列+連結串列組成的。
1 final V put(K key, int hash, V value, boolean onlyIfAbsent) { 2// 在往該 segment 寫入前,需要先獲取該 segment 的獨佔鎖 3//先看主流程,後面還會具體介紹這部分內容 4HashEntry<K,V> node = tryLock() ? null : 5scanAndLockForPut(key, hash, value); 6V oldValue; 7try { 8// 這個是 segment 內部的陣列 9HashEntry<K,V>[] tab = table; 10// 再利用 hash 值,求應該放置的陣列下標 11int index = (tab.length - 1) & hash; 12// first 是陣列該位置處的連結串列的表頭 13HashEntry<K,V> first = entryAt(tab, index); 14 15// 下面這串 for 迴圈雖然很長,不過也很好理解,想想該位置沒有任何元素和已經存在一個連結串列這兩種情況 16for (HashEntry<K,V> e = first;;) { 17if (e != null) { 18K k; 19if ((k = e.key) == key || 20(e.hash == hash && key.equals(k))) { 21oldValue = e.value; 22if (!onlyIfAbsent) { 23// 覆蓋舊值 24e.value = value; 25++modCount; 26} 27break; 28} 29// 繼續順著連結串列走 30e = e.next; 31} 32else { 33// node 到底是不是 null,這個要看獲取鎖的過程,不過和這裡都沒有關係。 34// 如果不為 null,那就直接將它設定為連結串列表頭;如果是null,初始化並設定為連結串列表頭。 35if (node != null) 36node.setNext(first); 37else 38node = new HashEntry<K,V>(hash, key, value, first); 39 40int c = count + 1; 41// 如果超過了該 segment 的閾值,這個 segment 需要擴容 42if (c > threshold && tab.length < MAXIMUM_CAPACITY) 43rehash(node); // 擴容後面也會具體分析 44else 45// 沒有達到閾值,將 node 放到陣列 tab 的 index 位置, 46// 其實就是將新的節點設定成原連結串列的表頭 47setEntryAt(tab, index, node); 48++modCount; 49count = c; 50oldValue = null; 51break; 52} 53} 54} finally { 55// 解鎖 56unlock(); 57} 58return oldValue; 59 }
整體流程還是比較簡單的,由於有獨佔鎖的保護,所以 segment 內部的操作並不複雜。至於這裡面的併發問題,我們稍後再進行介紹。
到這裡 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。
獲取寫入鎖: scanAndLockForPut
前面我們看到,在往某個 segment 中 put 的時候,首先會呼叫 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨佔鎖,如果失敗,那麼進入到 scanAndLockForPut 這個方法來獲取鎖。
下面我們來具體分析這個方法中是怎麼控制加鎖的。
1 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { 2HashEntry<K,V> first = entryForHash(this, hash); 3HashEntry<K,V> e = first; 4HashEntry<K,V> node = null; 5int retries = -1; // negative while locating node 6 7// 迴圈獲取鎖 8while (!tryLock()) { 9HashEntry<K,V> f; // to recheck first below 10if (retries < 0) { 11if (e == null) { 12if (node == null) // speculatively create node 13// 進到這裡說明陣列該位置的連結串列是空的,沒有任何元素 14// 當然,進到這裡的另一個原因是 tryLock() 失敗,所以該槽存在併發,不一定是該位置 15node = new HashEntry<K,V>(hash, key, value, null); 16retries = 0; 17} 18else if (key.equals(e.key)) 19retries = 0; 20else 21// 順著連結串列往下走 22e = e.next; 23} 24// 重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞佇列等待鎖 25//lock() 是阻塞方法,直到獲取鎖後返回 26else if (++retries > MAX_SCAN_RETRIES) { 27lock(); 28break; 29} 30else if ((retries & 1) == 0 && 31// 這個時候是有大問題了,那就是有新的元素進到了連結串列,成為了新的表頭 32//所以這邊的策略是,相當於重新走一遍這個 scanAndLockForPut 方法 33(f = entryForHash(this, hash)) != first) { 34e = first = f; // re-traverse if entry changed 35retries = -1; 36} 37} 38return node; 39 }
這個方法有兩個出口,一個是 tryLock() 成功了,迴圈終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨佔鎖。
這個方法就是看似複雜,但是其實就是做了一件事,那就是獲取該 segment 的獨佔鎖,如果需要的話順便例項化了一下 node。
獲取鎖時,並不直接使用lock來獲取,因為該方法獲取鎖失敗時會掛起。事實上,它使用了自旋鎖,如果tryLock獲取鎖失敗,說明鎖被其它執行緒佔用,此時通過迴圈再次以tryLock的方式申請鎖。如果在迴圈過程中該Key所對應的連結串列頭被修改,則重置retry次數。如果retry次數超過一定值,則使用lock方法申請鎖。
這裡使用自旋鎖是因為自旋鎖的效率比較高,但是它消耗CPU資源比較多,因此在自旋次數超過閾值時切換為互斥鎖。
擴容: rehash
重複一下,segment 陣列不能擴容,擴容是 segment 陣列某個位置內部的陣列 HashEntry\<k,v>[] 進行擴容,擴容後,容量為原來的 2 倍。
首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導致該 segment 的元素個數超過閾值,那麼先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。
該方法不需要考慮併發,因為到這裡的時候,是持有該 segment 的獨佔鎖的。
1 // 方法引數上的 node 是這次擴容後,需要新增到新的陣列中的資料。 2 private void rehash(HashEntry<K,V> node) { 3HashEntry<K,V>[] oldTable = table; 4int oldCapacity = oldTable.length; 5// 2 倍 6int newCapacity = oldCapacity << 1; 7threshold = (int)(newCapacity * loadFactor); 8// 建立新陣列 9HashEntry<K,V>[] newTable = 10(HashEntry<K,V>[]) new HashEntry[newCapacity]; 11// 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 為 31,對應二進位制 ‘000...00011111’ 12int sizeMask = newCapacity - 1; 13 14// 遍歷原陣列,老套路,將原陣列位置 i 處的連結串列拆分到 新陣列位置 i 和 i+oldCap 兩個位置 15for (int i = 0; i < oldCapacity ; i++) { 16// e 是連結串列的第一個元素 17HashEntry<K,V> e = oldTable[i]; 18if (e != null) { 19HashEntry<K,V> next = e.next; 20// 計算應該放置在新陣列中的位置, 21// 假設原陣列長度為 16,e 在 oldTable[3] 處,那麼 idx 只可能是 3 或者是 3 + 16 = 19 22int idx = e.hash & sizeMask; 23if (next == null)// 該位置處只有一個元素,那比較好辦 24newTable[idx] = e; 25else { // Reuse consecutive sequence at same slot 26// e 是連結串列表頭 27HashEntry<K,V> lastRun = e; 28// idx 是當前連結串列的頭結點 e 的新位置 29int lastIdx = idx; 30 31// 下面這個 for 迴圈會找到一個 lastRun 節點,這個節點之後的所有元素是將要放到一起的 32for (HashEntry<K,V> last = next; 33last != null; 34last = last.next) { 35int k = last.hash & sizeMask; 36if (k != lastIdx) { 37lastIdx = k; 38lastRun = last; 39} 40} 41// 將 lastRun 及其之後的所有節點組成的這個連結串列放到 lastIdx 這個位置 42newTable[lastIdx] = lastRun; 43// 下面的操作是處理 lastRun 之前的節點, 44//這些節點可能分配在另一個連結串列中,也可能分配到上面的那個連結串列中 45for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { 46V v = p.value; 47int h = p.hash; 48int k = h & sizeMask; 49HashEntry<K,V> n = newTable[k]; 50newTable[k] = new HashEntry<K,V>(h, p.key, v, n); 51} 52} 53} 54} 55// 將新來的 node 放到新陣列中剛剛的 兩個連結串列之一 的 頭部 56int nodeIndex = node.hash & sizeMask; // add the new node 57node.setNext(newTable[nodeIndex]); 58newTable[nodeIndex] = node; 59table = newTable; 60 }
總結一下put的流程:
當執行put操作時,會進行第一次key的hash來定位Segment的位置,如果該Segment還沒有初始化,即通過CAS操作進行賦值,然後進行第二次hash操作,找到相應的HashEntry的位置,這裡會利用繼承過來的鎖的特性,在將資料插入指定的HashEntry位置時(連結串列的尾端),會通過繼承ReentrantLock的tryLock()方法嘗試去獲取鎖,如果獲取成功就直接插入相應的位置,如果已經有執行緒獲取該Segment的鎖,那當前執行緒會以自旋的方式去繼續的呼叫tryLock()方法去獲取鎖,超過指定次數就掛起,等待喚醒。
get 過程分析
相對於 put 來說,get 真的不要太簡單。
- 計算 hash 值,找到 segment 陣列中的具體位置,或我們前面用的“槽”
- 槽中也是一個數組,根據 hash 找到陣列中具體的位置
- 到這裡是連結串列了,順著連結串列進行查詢即可
1 public V get(Object key) { 2Segment<K,V> s; // manually integrate access methods to reduce overhead 3HashEntry<K,V>[] tab; 4// 1. hash 值 5int h = hash(key); 6long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; 7// 2. 根據 hash 找到對應的 segment 8if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && 9(tab = s.table) != null) { 10// 3. 找到segment 內部陣列相應位置的連結串列,遍歷 11for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile 12(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); 13e != null; e = e.next) { 14K k; 15if ((k = e.key) == key || (e.hash == h && key.equals(k))) 16return e.value; 17} 18} 19return null; 20 }
size操作
put、remove和get操作只需要關心一個Segment,而size操作需要遍歷所有的Segment才能算出整個Map的大小。一個簡單的方案是,先鎖住所有Sgment,計算完後再解鎖。但這樣做,在做size操作時,不僅無法對Map進行寫操作,同時也無法進行讀操作,不利於對Map的並行操作。
為更好支援併發操作,ConcurrentHashMap會在不上鎖的前提逐個Segment計算3次size,如果某相鄰兩次計算獲取的所有Segment的更新次數(每個Segment都與HashMap一樣通過modCount跟蹤自己的修改次數,Segment每修改一次其modCount加一)相等,說明這兩次計算過程中無更新操作,則這兩次計算出的總size相等,可直接作為最終結果返回。如果這三次計算過程中Map有更新,則對所有Segment加鎖重新計算Size。該計算方法程式碼如下
1 public int size() { 2final Segment<K,V>[] segments = this.segments; 3int size; 4boolean overflow; // true if size overflows 32 bits 5long sum;// sum of modCounts 6long last = 0L;// previous sum 7int retries = -1; // first iteration isn't retry 8try { 9for (;;) { 10if (retries++ == RETRIES_BEFORE_LOCK) { 11for (int j = 0; j < segments.length; ++j) 12ensureSegment(j).lock(); // force creation 13} 14sum = 0L; 15size = 0; 16overflow = false; 17for (int j = 0; j < segments.length; ++j) { 18Segment<K,V> seg = segmentAt(segments, j); 19if (seg != null) { 20sum += seg.modCount; 21int c = seg.count; 22if (c < 0 || (size += c) < 0) 23overflow = true; 24} 25} 26if (sum == last) 27break; 28last = sum; 29} 30} finally { 31if (retries > RETRIES_BEFORE_LOCK) { 32for (int j = 0; j < segments.length; ++j) 33segmentAt(segments, j).unlock(); 34} 35} 36return overflow ? Integer.MAX_VALUE : size; 37 }
ConcurrentHashMap的Size方法是一個巢狀迴圈,大體邏輯如下:
1.遍歷所有的Segment。
2.把Segment的元素數量累加起來。
3.把Segment的修改次數累加起來。
4.判斷 所有Segment的總修改次數是否大於上一次的 總修改次數。如果大於,說明統計過程中有修改,重新統計,嘗試次數+1;如果不是。說明沒有修改,統計結束。
5.如果嘗試次數超過閾值,則對每一個Segment加鎖,再重新統計。
6.再次判斷 所有Segment 的總修改次數是否大於上一次的 總修改次數。由於已經加鎖,次數一定和上次相等。
7.釋放鎖,統計結束。
併發問題分析
現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮併發問題。
新增節點的操作 put 和刪除節點的操作 remove 都是要加 segment 上的獨佔鎖的,所以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。
-
put 操作的執行緒安全性。
- 初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的陣列。
- 新增節點到連結串列的操作是插入到表頭的,所以,如果這個時候 get 操作在連結串列遍歷的過程已經到了中間,是不會影響的。當然,另一個併發問題就是 get 操作在 put 之後,需要保證剛剛插入表頭的節點被讀取,這個依賴於 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
- 擴容。擴容是新建立了陣列,然後進行遷移資料,最後面將 newTable 設定給屬性 table。所以,如果 get 操作此時也在進行,那麼也沒關係,如果 get 先行,那麼就是在舊的 table 上做查詢操作;而 put 先行,那麼 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。
-
remove 操作的執行緒安全性。
remove 操作我們沒有分析原始碼,所以這裡說的讀者感興趣的話還是需要到原始碼中去求實一下的。
get 操作需要遍歷連結串列,但是 remove 操作會"破壞"連結串列。
如果 remove 破壞的節點 get 操作已經過去了,那麼這裡不存在任何問題。
如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那麼需要將頭結點的 next 設定為陣列該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 並不能提供陣列內部操作的可見性保證,所以原始碼中使用了 UNSAFE 來運算元組,請看方法 setEntryAt。2、如果要刪除的節點不是頭結點,它會將要刪除節點的後繼節點接到前驅節點中,這裡的併發保證就是 next 屬性是 volatile 的。