JDK原始碼分析(12)之 ConcurrentHashMap 詳解
本文將主要講述 JDK1.8 版本 的 ConcurrentHashMap,其內部結構和很多的雜湊優化演算法,都是和 JDK1.8 版本的 HashMap是一樣的,所以在閱讀本文之前,一定要先了解 HashMap,可以參考HashMap 相關;另外 ConcurrentHashMap 中同樣有紅黑樹,這部分可以先不看不影響整體結構把握,有興趣的可以檢視紅黑樹;
一、ConcurrentHashMap 結構概述
1. 整體概述
CHM 的原始碼有 6k 多行,包含的內容多,精巧,不容易理解;建議在檢視原始碼的時候,可以首先把握整體結構脈絡,對於一些精巧的優化,雜湊技巧可以先了解目的就可以了,不用深究;對整體把握比較清楚後,在逐步分析,可以比較快速的看懂;
JDK1.8 版本中的 CHM,和 JDK1.7 版本的差別非常大,在檢視資料的時候要注意區分,1.7 中主要是使用 Segment 分段鎖 來解決併發問題的;而在 1.8 中則完全沒有這些稍顯臃腫的結構,其結構基本和 HashMap 是一樣的,都是 陣列 + 連結串列 + 紅黑樹,如圖所示:
其主要區別就在 CHM 支援併發:
- 使用 Unsafe 方法運算元組內部元素,保證可見性;(U.getObjectVolatile、U.compareAndSwapObject、U.putObjectVolatile);
- 在更新和移動節點的時候,直接鎖住對應的雜湊桶,鎖粒度更小,且動態擴充套件;
- 針對擴容慢操作進行優化,
- 首先擴容過程的中,節點首先移動到過度表 nextTable ,所有節點移動完畢時替換散列表 table ;
- 移動時先將散列表定長等分,然後逆序依次領取任務擴容,設定 sizeCtl 標記正在擴容;
- 移動完成一個雜湊桶或者遇到空桶時,將其標記為 ForwardingNode 節點,並指向 nextTable ;
- 後有其他執行緒在操作雜湊表時,遇到 ForwardingNode 節點,則先幫助擴容(繼續領取分段任務),擴容完成後再繼續之前的操作;
- 優化雜湊表計數器,採用 LongAdder、Striped64 類似思想;
- 以及大量的雜湊演算法優化和狀態變數優化;
以上講的這些不太清楚也沒有關係,主要是有一個印象,大致清楚 CHM 的實現方向,具體細節後面還會結合原始碼詳細講解;
2. 類定義和成員變數
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { private static final int MAXIMUM_CAPACITY = 1 << 30;// 最大容量 private static final int DEFAULT_CAPACITY = 16;// 預設初始化容量 private static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 併發級別,為相容1.7,實際未用 private static final float LOAD_FACTOR = 0.75f;// 固定負載係數,n - (n >>> 2) static final int TREEIFY_THRESHOLD = 8;// 連結串列超過8時,轉為紅黑樹 static final int UNTREEIFY_THRESHOLD = 6;// 紅黑樹低於6時,轉為連結串列 static final int MIN_TREEIFY_CAPACITY = 64;// 樹化最小容量,容量小於64時,先擴容 private static final int MIN_TRANSFER_STRIDE = 16;// 擴容時拆分散列表,最小步長 private static int RESIZE_STAMP_BITS = 16; private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;// 可參與擴容的最大執行緒 static final int NCPU = Runtime.getRuntime().availableProcessors();// CPU 數 transient volatile Node<K,V>[] table;// 散列表 private transient volatile Node<K,V>[] nextTable;// 擴容時的過度表 private transient volatile int sizeCtl;// 最重要的狀態變數,下面詳講 private transient volatile int transferIndex;// 擴容進度指示 private transient volatile long baseCount;// 計數器,基礎基數 private transient volatile int cellsBusy;// 計數器,併發標記 private transient volatile CounterCell[] counterCells;// 計數器,併發累計 public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));// 注意這裡不是0.75,後面介紹 this.sizeCtl = cap; } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel)// Use at least as many bins initialCapacity = concurrencyLevel;// as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor);// 注意這裡的初始化 int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; } ... }
上面有幾個重要的地方這裡單獨講:
LOAD_FACTOR:
這裡的負載係數,同 HashMap 等其他 Map 的係數有明顯區別:
-
通常的係數預設 0.75,可以由建構函式傳入,當節點數 size 超過 loadFactor * capacity 時擴容;
-
而 CMH 的係數則固定 0.75(使用
n - (n >>> 2)
表示),建構函式傳入的係數隻影響初始化容量,見第5個建構函式; -
上面第二個建構函式中,
initialCapacity + (initialCapacity >>> 1) + 1)
,這裡居然不是使用的預設0.75,可以看作bug,也可視作優化,見
sizeCtl:
sizeCtl 是 CHM 中最重要的狀態變數,其中包括很多中狀態,這裡先整體介紹幫助後面原始碼理解;
-
sizeCtl = 0 :初始值,還未指定初始容量;
-
sizeCtl > 0 :
- table 未初始化,表示初始化容量;
- table 已初始化,表示擴容閾值(0.75n);
-
sizeCtl = -1 :表示正在初始化;
-
sizeCtl < -1 :表示正在擴容,具體結構如圖所示:
計算程式碼如下:
/* * n=64 * Integer.numberOfLeadingZeros(n)=26 * resizeStamp(64) = 0001 1010 | 1000 0000 0000 0000 = 1000 0000 0001 1010 */ static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
所以 resizeStamp(64) << RESIZE_STAMP_SHIFT) + 2
,表示擴容目標為 64,有一個執行緒正在擴容;
3. Node 節點
static class Node<K,V> implements Map.Entry<K,V> {// 雜湊表普通節點 final int hash; final K key; volatile V val; volatile Node<K,V> next; Node<K,V> find(int h, Object k) {}// 主要在擴容時,利用多型查詢已轉移節點 } static final class ForwardingNode<K,V> extends Node<K,V> {// 標識擴容節點 final Node<K,V>[] nextTable;// 指向成員變數 ConcurrentHashMap.nextTable ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null);// hash = -1,快速確定 ForwardingNode 節點 this.nextTable = tab; } Node<K,V> find(int h, Object k) {} } static final class TreeBin<K,V> extends Node<K,V> { // 紅黑樹根節點 TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null);// hash = -2,快速確定紅黑樹, ... } } static final class TreeNode<K,V> extends Node<K,V> { } // 紅黑樹普通節點,其 hash 同 Node 普通節點 > 0;
4. 雜湊計算
static final int MOVED= -1;// hash for forwarding nodes static final int TREEBIN= -2;// hash for roots of trees static final int RESERVED= -3;// hash for transient reservations static final int HASH_BITS = 0x7fffffff;// usable bits of normal node hash // 讓高位16位,參與雜湊桶定位運算的同時,保證 hash 為正 static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
除此之外還有,
- tableSizeFor : 將容量轉為大於n,且最小的2的冪;
- 除留餘數法 :
hash % length = hash & (length-1)
; - 擴容後雜湊桶定位:
(e.hash & oldCap)
,0 - 位置不變,1 - 原來的位置 + oldCap;
以上這些雜湊優化的具體原理,都在之前的部落格講過了,就不在重複了,HashMap 相關;
5. 雜湊桶可見性
我們都知道一個數組即使宣告為 volatile
,也只能保證這個陣列引用本身的可見性,其內部元素的可見性是無法保證的,如果每次都加鎖,則效率必然大大降低,在 CHM 中則使用 Unsafe
方法來保證:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
二、原始碼解析
1. initTable 方法
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield();// 有其他執行緒在初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 設定狀態 -1 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY;// 注意此時的 sizeCtl 表示初始容量,完畢後表示擴容閾值 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2);// 同 0.75n } } finally { sizeCtl = sc;// 注意這裡沒有 CAS 更新,這就是狀態變數的高明瞭,因為前面設定了 -1,此時這裡沒有競爭 } break; } } return tab; }
2. get 方法
get 方法可能看程式碼不是很長,但是他卻能 保證無鎖狀態下的記憶體一致性 ,他的每一句程式碼都要仔細理解,多設想一下如果發生競爭會怎樣,如此才能有所得;
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); // 計算 hash if ((tab = table) != null && (n = tab.length) > 0 &&// 確保 table 已經初始化 // 確保對應的雜湊桶不為空,注意這裡是 Volatile 語義獲取;因為擴容的時候,是完全拷貝,所以只要不為空,則連結串列必然完整 (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // hash < 0,則必然在擴容,原來位置的節點可能全部移動到 i + oldCap 位置,所以利用多型到 nextTable 中查詢 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { // 遍歷連結串列 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
3. putVal 方法
注意 CHM 的 key 和 value 都不能為空
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode());// hash 計算 int binCount = 0;// 狀態變數,主要表示查詢連結串列節點數,最後判斷是否轉為紅黑樹 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable();// 初始化 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// cas 獲取雜湊桶 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // cas 更新,失敗時繼續迴圈更新 break;// no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);// 正在擴容的時候,先幫助擴容 else { V oldVal = null; synchronized (f) {// 注意這裡只鎖定了一個雜湊桶,所以比 1.7 中的 Segment 分段鎖 粒度更低 if (tabAt(tab, i) == f) {// 確認該雜湊桶是否已經移動 if (fh >= 0) {// hash >=0 則必然是普通節點,直接遍歷連結串列即可 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if(e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 查詢成功 oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) {// 查詢失敗時,直接在末尾新增新節點 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) {// 樹根節點 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {// 紅黑樹查詢 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 如果連結串列長度大於8,轉為紅黑樹 if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); // 計數加一,注意這裡使用的是計數器,普通的 Atomic 變數仍然可能稱為效能瓶頸; return null; }
其具體流程如圖所示:
4. 擴容
擴容操作一直都是比較慢的操作,而 CHM 中巧妙的利用任務劃分,使得多個執行緒可能同時參與擴容;另外擴容條件也有兩個:
- 有連結串列長度超過 8,但是容量小於 64 的時候,發生擴容;
- 節點數超過閾值的時候,發生擴容;
其擴容的過程可描述為:
- 首先擴容過程的中,節點首先移動到過度表 nextTable ,所有節點移動完畢時替換散列表 table ;
- 移動時先將散列表定長等分,然後逆序依次領取任務擴容,設定 sizeCtl 標記正在擴容;
- 移動完成一個雜湊桶或者遇到空桶時,將其標記為 ForwardingNode 節點,並指向 nextTable ;
- 後有其他執行緒在操作雜湊表時,遇到 ForwardingNode 節點,則先幫助擴容(繼續領取分段任務),擴容完成後再繼續之前的操作;
圖形化表示如下:
原始碼分析如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // 根據 CPU 數量計算任務步長 if (nextTab == null) {// 初始化 nextTab try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];// 擴容一倍 nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; // 發生 OOM 時,不再擴容 return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// 標記空桶,或已經轉移完畢的桶 boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) {// 逆向遍歷擴容 Node<K,V> f; int fh; while (advance) {// 向前獲取雜湊桶 int nextIndex, nextBound; if (--i >= bound || finishing)// 已經取到雜湊桶,或已完成時退出 advance = false; else if ((nextIndex = transferIndex) <= 0) { // 遍歷到達頭節點,已經沒有待遷移的桶,執行緒準備退出 i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {// 當前任務完成,領取下一批雜湊桶 bound = nextBound; i = nextIndex - 1;// 索引指向下一批雜湊桶 advance = false; } } // i < 0:表示擴容結束,已經沒有待移動的雜湊桶 // i >= n :擴容結束,再次檢查確認 // i + n >= nextn : 在使用 nextTable 替換 table 時,有執行緒進入擴容就會出現 if (i < 0 || i >= n || i + n >= nextn) { // 完成擴容準備退出 int sc; if (finishing) {// 兩次檢查,只有最後一個擴容執行緒退出時,才更新變數 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); // 0.75*2*n return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 擴容執行緒減一 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return;// 不是最後一個執行緒,直接退出 finishing = advance = true;// 最後一個執行緒,再次檢查 i = n;// recheck before commit } } else if ((f = tabAt(tab, i)) == null)// 當前節點為空,直接標記為 ForwardingNode,然後繼續獲取下一個桶 advance = casTabAt(tab, i, null, fwd); // 之前的執行緒已經完成該桶的移動,直接跳過,正常情況下自己的任務區間,不會出現 ForwardingNode 節點, else if ((fh = f.hash) == MOVED)// 此處為極端條件下的健壯性檢查 advance = true; // already processed // 開始處理連結串列 else { // 注意在 get 的時候,可以無鎖獲取,是因為擴容是全拷貝節點,完成後最後在更新雜湊桶 // 而在 put 的時候,是直接將節點加入尾部,獲取修改其中的值,此時如果允許 put 操作,最後就會發生髒讀, // 所以 put 和 transfer,需要競爭同一把鎖,也就是對應的雜湊桶,以保證記憶體一致性效果 synchronized (f) { if (tabAt(tab, i) == f) {// 確認鎖定的是同一個桶 Node<K,V> ln, hn; if (fh >= 0) {// 正常節點 int runBit = fh & n;// hash & n,判斷擴容後的索引 Node<K,V> lastRun = f; // 此處找到連結串列最後擴容後處於同一位置的連續節點,這樣最後一節就不用再一次複製了 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // 依次將連結串列拆分成,lo、hi 兩條連結串列,即位置不變的連結串列,和位置 + oldCap 的連結串列 // 注意最後一節連結串列沒有new,而是直接使用原來的節點 // 同時連結串列的順序也被打亂了,lastRun 到最後為正序,前面一節為逆序 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln);// 插入 lo 連結串列 setTabAt(nextTab, i + n, hn);// 插入 hi 連結串列 setTabAt(tab, i, fwd);// 雜湊桶移動完成,標記為 ForwardingNode 節點 advance = true;// 繼續獲取下一個桶 } else if (f instanceof TreeBin) { // 拆分紅黑樹 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; // 為避免最後在反向遍歷,先留頭結點的引用, TreeNode<K,V> hi = null, hiTail = null; // 因為順序的連結串列,可以加速紅黑樹構造 int lc = 0, hc = 0;// 同樣記錄 lo,hi 連結串列的長度 for (Node<K,V> e = t.first; e != null; e = e.next) {// 中序遍歷紅黑樹 int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);// 構造紅黑樹節點 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 判斷是否需要將其轉化為紅黑樹,同時如果只有一條鏈,那麼就可以不用在構造 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
還有其他相關方法不是很複雜,就不詳細講了,比如 tryPresize,helpTransfer,addCount
5. 計數器
當獲取 Map.size 的時候,如果使用 Atomic 變數,很容易導致過度競爭,產生效能瓶頸,所以 CHM 中使用了,計數器的方式:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }
private transient volatile CounterCell[] counterCells;// 計數器 @sun.misc.Contended static final class CounterCell {// @sun.misc.Contended 避免偽快取 volatile long value; CounterCell(long x) { value = x; } } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) {// 累計計數 if ((a = as[i]) != null) sum += a.value; } } return sum; }
具體細節還比較多,之後在單獨開一篇部落格詳細講解;
總結
- 首先 JDK1.8 的 CHM,沒有使用 Segment 分段鎖,而是直接鎖定單個雜湊桶
- 對陣列中的雜湊桶使用 CAS 操作,保證其可見性
- 對擴容是用,任務拆分,多執行緒同時擴容的方式,加速擴容
- 對 size 使用計數器思想
- CHM 中對狀態變數的應用,使得很多操作都得以無所化進行