1. 程式人生 > >ConcurrentHashMap原始碼解析 JDK8

ConcurrentHashMap原始碼解析 JDK8

一、簡介

上篇文章詳細介紹了HashMap的原始碼及原理,本文趁熱打鐵繼續分析ConcurrentHashMap的原理。

首先在看本文之前,希望對HashMap有一個詳細的瞭解。不然看直接看ConcurrentHashMap的原始碼還是有些費勁的。

相信對HashMap,HashTable有一定了解,應該知道HashMap是不具備執行緒安全性的,在resize時會丟資料(JDK8),而HashTable雖然保證了執行緒安全性,但是其是通過給每個方法加Synchronized關鍵字達到的同步目的。但是都知道Synchronized在競爭激烈的多執行緒併發環境中,在效能上的表現是非常不如人意的。那在高併發環境中HashMap如何保證執行緒安全而又不浪費太多效能呢?答案就是Java J.U.C併發包中的ConcurrentHashMap。

依然開局一張圖。JDK8中的ConcurrentHashMap資料結構。

 

 

 呃呵,和HashMap的結構是一樣的,沒錯在資料結構層面,ConcurrentHashMap和HashMap是完全一樣的。有了這個基礎繼續往下看。

二、歷史版本

ConcurrentHashMap的歷史版本大致分界線在JDK8。也就是可以分為JDK8和JDK8以前版本。

資料結構的區別

在JDK8之前HashMap沒有引入紅黑樹,同樣的ConcurrentHashMap也沒有引入紅黑樹。而且ConcurrentHashMap採用的是分段陣列的底層資料結構。

在JDK7中的資料結構。

從上圖我們不難看出其在資料結構方面的差別。

鎖的區別

JDK7中為了提高併發效能採用了這種分段的設計。所以在JDK7中ConcurrentHashMap採用的是分段鎖,也就是在每個Segment上加ReentrantLock實現的執行緒安全線。關於ReetrantLock後面有時間會介紹,大致來說ReetrantLoack是比Synchronized更細粒度的一種鎖。使用得當的話其效能要比Synchronized表現要好,但是如果實現不得當容易造成死鎖。

這種基於Segment和ReetrantLock的設計相對HashTable來說大大提高了併發效能。也就是說多個執行緒可以併發的操作多個Segment,而HashTable是通過給每個方法加Synchronized即將多執行緒序列而實現的。所以在一定程度上提高了併發效能。但是這種效能的提升表現相對JDK8來說顯得不值一提。

如果說JDK7 ConcurrentHashMap相對HashTable來說是序列到多個執行緒併發的改進。而JDK8則是通過比Segment更細粒度的併發控制大大提高了其並發表現。

JDK8中ConcurrentHashMap採用的是CAS+Synchronized鎖並且鎖粒度是每一個桶。簡單來說JDK7中鎖的粒度是Segment,JDK8鎖粒度細化到了桶級別。可想而知鎖粒度是大大提到了。輔之以程式碼的優化,JDK8中的ConcurrentHashMap在效能上的表現非常優秀。

簡單總結一下,從HashTable到JDK7 ConcurrentHashMap再到JDK8 ConcurrentHashMap。是從同步到併發再到高併發的進步。

三、基礎知識

3.1、常量

//正在擴容,對應fwd型別的節點的hash
static final int MOVED     = -1; // hash for forwarding nodes 
//當前陣列
transient volatile Node<K,V>[] table;
//擴容時用到的,擴容後的陣列。
private transient volatile Node<K,V>[] nextTable;
//1,大於零,表示size * 0.75。
//2,等於-1,表示正在初始化。
//3,-(n + 1),表示正在執行擴容的執行緒其只表示基數,而不是真正的數量,需要計算得出的哦
private transient volatile int sizeCtl;

3.2、Unsafe類方法

 1     @SuppressWarnings("unchecked")  //transient volatile Node<K,V>[] table; tab變數確實是volatile
 2     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {//獲取table中索引 i 處的元素。
 3         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);//如果tab是volatile變數,則該方法保證其可見性。
 4     }
 5 
 6     static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,//通過CAS設定table索引為 i 處的元素。
 7                                         Node<K,V> c, Node<K,V> v) {
 8         return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
 9     }
10             //transient volatile Node<K,V>[] table; tab變數確實是volatile
11     static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {//修改table 索引 i 處的元素。
12         U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);//如果tab是volatile變數,則該方法保證其可見性。
13     }

 

我們不難看出 以上三個方法都是呼叫的Unsafe(U)類中的方法,Unsafe類中定義了大量對記憶體的操作方法,是native的,不建議開發者直接使用。

tabAt和setTabAt最終呼叫的兩個方法分別是 U.getObjectVolatile()和U.putObjectVolatile 顧名思義其是通過volatile保證的tab的可見性(Volatile只保證可見性不保證原子性哦)。前提是tab變數是Volatile修飾的變數。我們通過呼叫棧,最紅可以看到其實tab就是ConcurrentHashMap中的table。而這個變數是這麼定義的。

transient volatile Node<K,V>[] table;

 

可見其確實是Volatile修飾的變數。

再看

casTabAt方法,這個就是CAS方法了。

CAS:Compare and Swap三個單詞的縮寫,即:比較交換的意思。CAS在Java中又稱之為樂觀鎖即我們總認為是沒有鎖的。

while(true){
    CAS();  
}

一般的通過上述用法達到自旋的目的。CAS一般通過自旋達到自旋鎖的目的,即認為沒有鎖,失敗重試,這種思路。更多內容請自行百度。CAS很重要哦。

四、put過程原始碼

 1 public V put(K key, V value) {
 2     return putVal(key, value, false);
 3 }
 4 
 5 /** Implementation for put and putIfAbsent */
 6 final V putVal(K key, V value, boolean onlyIfAbsent) {
 7     if (key == null || value == null) throw new NullPointerException();
 8     int hash = spread(key.hashCode());//hash,對hashcode再雜湊
 9     int binCount = 0;
10     for (Node<K,V>[] tab = table;;) {//迭代桶陣列,自旋
11         Node<K,V> f; int n, i, fh;
12         if (tab == null || (n = tab.length) == 0)//懶載入。如果為空,則進行初始化
13             tab = initTable();//初始化桶陣列
14         //(n - 1) & hash)計算下標,取值,為空即無hash碰撞
15         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
16             if (casTabAt(tab, i, null,
17                          new Node<K,V>(hash, key, value, null)))//通過cas插入新值
18                 break;                   // no lock when adding to empty bin
19         }
20         //判斷是否正在擴容。如果正在擴容,當前執行緒幫助進行擴容。
21         //每個執行緒只能同時負責一個桶上的資料遷移,並且不影響其它桶的put和get操作。
22         //(很牛逼的思路,能這麼做建立在更細粒度的鎖基礎上)
23         else if ((fh = f.hash) == MOVED)
24             tab = helpTransfer(tab, f);
25         else {//put5,存在hash碰撞
26             V oldVal = null;
27             //此處,f在上面已經被賦值,f為當前下標桶的首元素。對連結串列來說是連結串列頭對紅黑樹來說是紅黑樹的頭元素。
28             synchronized (f) {
29                 //再次檢查當前節點是否有變化,有變化進入下一輪自旋
30                 //為什麼再次檢查?因為不能保證,當前執行緒到這裡,有沒有其他執行緒對該節點進行修改
31                 if (tabAt(tab, i) == f) {
32                     if (fh >= 0) {//當前桶為連結串列
33                         binCount = 1;
34                         for (Node<K,V> e = f;; ++binCount) {//迭代連結串列節點
35                             K ek;
36                             if (e.hash == hash &&//key相同,覆蓋(onlyIfAbsent有什麼用?)
37                                 ((ek = e.key) == key ||
38                                  (ek != null && key.equals(ek)))) {
39                                 oldVal = e.val;
40                                 if (!onlyIfAbsent)
41                                     e.val = value;
42                                 break;
43                             }
44                             Node<K,V> pred = e;
45                             //找到連結串列尾部,插入新節點。(什麼這裡不用CAS?因為這在同步程式碼塊裡面)
46                             if ((e = e.next) == null) {
47                                 pred.next = new Node<K,V>(hash, key,
48                                                           value, null);
49                                 break;
50                             }
51                         }
52                     }
53                     else if (f instanceof TreeBin) {//當前桶為紅黑樹
54                         Node<K,V> p;
55                         binCount = 2;
56                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
57                                                        value)) != null) {//想紅黑樹插入新節點
58                             oldVal = p.val;
59                             if (!onlyIfAbsent)
60                                 p.val = value;
61                         }
62                     }
63                 }
64             }
65             if (binCount != 0) {
66                 //樹化。binCount > 8,進行樹化,連結串列轉紅黑樹
67                 if (binCount >= TREEIFY_THRESHOLD)
68                     //如果容量 < 64則直接進行擴容;不轉紅黑樹。
69                     //(你想想,假如容量為16,你就插入了9個元素,巧了,都在同一個桶裡面,
70                     //如果這時進行樹化,時間複雜度會增加,效能下降,不如直接進行擴容,空間換時間)
71                     treeifyBin(tab, i);
72                 if (oldVal != null)
73                     return oldVal;
74                 break;
75             }
76         }
77     }
78     addCount(1L, binCount);//擴容。addCount內部會進行判斷要不要擴容
79     return null;
80 }

總結以上過程

1,懶載入,未初始化則初始化table
2,hash,hashcode再雜湊,並計算下標
3,無碰撞,通過CAS插入
4,有碰撞
  4.1、如果正在擴容,協助其它執行緒去擴容
  4.2、如果是連結串列,插入連結串列
  4.3、如果是紅黑樹,插入紅黑樹
  4.4、如果連結串列長度超過8,樹化
  4.5,如果key已經存在,覆蓋舊值
5,需要擴容,則擴容

相比HashMap過程多了一個協助擴容。

以上原始碼需要注意的是

1 for (Node<K,V>[] tab = table;;) {//迭代桶陣列,自旋
2     
3 }

這是一個自旋的過程,如果CAS修改失敗會進入下一輪自旋。很久以前看這段原始碼的時候,我總是在想,CAS失敗了不就丟資料了嗎?所以這個自旋,也稱之為自旋鎖會保證資料一定能插入成功。

說說上面鎖競爭的情況,以上過程我們不難發現對table的修改都是通過CAS操作實現的。比如下面這行程式碼,如果已經有執行緒正在操作 i 位置的元素,則意味著本輪自旋將會失敗,繼續自旋,當其他執行緒修改完成,本執行緒再次執行到tabAt以為是Volatile操作,其他執行緒的修改對本執行緒立即可見(詳見Volatile關鍵字記憶體語義的內容)。本執行緒通過tabAt發現該處已經存在元素,即發生碰撞,繼續往下執行。

 

1 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
2     if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))//通過cas插入新值
3      break;                   // no lock when adding to empty bin
4 }

執行緒的排程需要作業系統從使用者態轉為核心態,這是非常重量級的操作。CAS+自旋組成的自旋鎖保證了執行緒不會進入阻塞態。

然後繼續往下看

 

synchronized (f) {
    //再次檢查當前節點是否有變化,有變化進入下一輪自旋
    //為什麼再次檢查?因為不能保證,當前執行緒執行到這裡,有沒有其他執行緒對該節點進行修改
    if (tabAt(tab, i) == f) {

先看這行程式碼 synchronized (f) 這個f是一個桶的頭元素。也就是說在JDK8中synchronized鎖僅僅只鎖鏈表頭或者紅黑樹的頭(其實就是鎖一個桶,因為要訪問連結串列或者紅黑樹總要從頭開始訪問吧)

再看 if (tabAt(tab, i) == f) {} 其實就是雙重檢測(參考單例的雙重檢測),為什麼要再檢查一遍呢?因為不能保證當前執行緒執行到這裡,有沒有其他執行緒已經對該節點進行了修改。

initTable()

 1 private final Node<K,V>[] initTable() {
 2     Node<K,V>[] tab; int sc;
 3     while ((tab = table) == null || tab.length == 0) {
 4         // 賦值sc。並當sizeCtl == -1 即當前有執行緒正在執行初始化
 5         if ((sc = sizeCtl) < 0) 
 6             //yield()暫停當前正在執行的執行緒,執行其他執行緒
 7             //(這是一個通知,但是這是不一定會讓當前執行緒停止,要取決於執行緒排程器)
 8             //就是我想讓出資源,但是這只是一廂情願的事情,執行緒排程器會考慮你的方法,但是不一定採納。
 9             Thread.yield(); 
10         //修改 sizeCtl 的值為 -1。 SIZECTL 為 sizeCtl 的記憶體地址。
11         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
12             try {
13                 //執行初始化過程
14                 if ((tab = table) == null || tab.length == 0) {
15                     //sc在上面已經賦值,=原來 sizeCtl的值。是非討厭JDK原始碼這種賦值方式。
16                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
17                     @SuppressWarnings("unchecked")
18                     //建立一個sc長度的table。
19                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
20                     table = tab = nt;
21                     sc = n - (n >>> 2);
22                 }
23             } finally {
24                 //初始化完成, sizeCtl重新賦值為當前陣列的長度。
25                 sizeCtl = sc;
26             }
27             break;
28         }
29     }
30     return tab;
31 }

以上過程,同樣是通過CAS實現的初始化控制,保證只有一個執行緒去執行初始化。

helpTransfer(tab, f);方法我們後面介紹完擴容再說。

看完以上put過程,我們能發現,JDK8通過CAS+自旋鎖將鎖的粒度控制在每一個桶上,相對於JDK7中Segment鎖,鎖粒度提高了很多。並且CAS+自旋鎖保證了不會出現執行緒的切花這種重量級的操作。

五、擴容

  1 //tab舊桶陣列,nextTab新桶陣列
  2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  3     int n = tab.length, stride;
  4     //控制併發數,控制CPU的資源
  5     if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  6         stride = MIN_TRANSFER_STRIDE; // subdivide range
  7     if (nextTab == null) {            // initiating//新陣列為空,則初始化新陣列
  8         try {
  9             @SuppressWarnings("unchecked")
 10             //擴容為原來的兩倍 n << 1
 11             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
 12             nextTab = nt;
 13         } catch (Throwable ex) {      // try to cope with OOME
 14             sizeCtl = Integer.MAX_VALUE;
 15             return;
 16         }
 17         nextTable = nextTab;
 18         transferIndex = n;
 19     }
 20     int nextn = nextTab.length;
 21     //在這裡面進行new Node將node.hash置為-1。表示該桶正在進行移動。
 22     //(這裡很重要的一點是,只鎖表頭,所以只需要將連結串列(或者紅黑樹)頭結點.hash置為-1即可)
 23     ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 24     //advance是控制是否繼續進行移動的條件,當advance == false,表示正在移動一個桶。
 25     //true表示可以繼續進行下一個桶的移動
 26     boolean advance = true;
 27     boolean finishing = false; // to ensure sweep before committing nextTab
 28     for (int i = 0, bound = 0;;) {//自旋
 29         Node<K,V> f; int fh;
 30         while (advance) {//start
 31             int nextIndex, nextBound;
 32             //當前桶是不是已經移動完了
 33             if (--i >= bound || finishing)
 34                 advance = false;
 35             //兩個停止移動的條件。移動完了。(這個是真正停止的條件。下面那個條件會進行一次檢查)
 36             else if ((nextIndex = transferIndex) <= 0) {
 37                 i = -1;
 38                 advance = false;
 39             }
 40             else if (U.compareAndSwapInt
 41                      (this, TRANSFERINDEX, nextIndex,
 42                       nextBound = (nextIndex > stride ?
 43                                    nextIndex - stride : 0))) {
 44                 bound = nextBound;
 45                 i = nextIndex - 1;
 46                 advance = false;
 47             }
 48         }
 49         if (i < 0 || i >= n || i + n >= nextn) {
 50             int sc;
 51             if (finishing) {//結束擴容
 52                 nextTable = null;
 53                 table = nextTab;
 54                 sizeCtl = (n << 1) - (n >>> 1);
 55                 return;
 56             }
 57             if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
 58                 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
 59                     return;
 60                 finishing = advance = true;
 61                 i = n; // recheck before commit 再次檢查一遍,防止有桶中還有資料沒移動。
 62             }
 63         }//end 從start到end可看可不看就是條件控制,包括結束條件的控制,移動進度的控制等。
 64         //該桶沒資料
 65         else if ((f = tabAt(tab, i)) == null)
 66             //將oldtab中的該桶設定為fwd節點,hash=-1
 67             advance = casTabAt(tab, i, null, fwd);
 68         //已經移動過的桶其hash=-1
 69         else if ((fh = f.hash) == MOVED)
 70             advance = true; // already processed
 71         else {
 72             synchronized (f) {//上鎖
 73                 if (tabAt(tab, i) == f) {
 74                     //ln新連結串列,不需要移動的節點重新組組織成的連結串列。
 75                     //hn新連結串列,需要移動的節點重新組織成的連結串列
 76                     Node<K,V> ln, hn;
 77                     if (fh >= 0) {//連結串列
 78                         int runBit = fh & n;
 79                         Node<K,V> lastRun = f;
 80                         //start
 81                         //從start,到end之間。不看也行。實在費腦子。其實這段程式碼寫的有點讓人費解
 82                         //主要是不認真看不知道作者的意圖。本意是這樣的。判斷是不是可以從某個節點n開始
 83                         //後面的節點是不是都是和節點n一樣,移動的目標桶一樣的。
 84                         //如果是一樣的,則後面的這些節點就不用移動了,只需要移動n節點即可。
 85                         //(注意連結串列的引用,next指標就把後面的都帶過去了)
 86                         //想一個極端情況,如果在這裡迭代後發現,所有節點,擴容後資料移動的目標桶都是一樣的。
 87                         //則只需要移動頭結點即可。不用重新拼接連結串列了。
 88                         for (Node<K,V> p = f.next; p != null; p = p.next) {
 89                             int b = p.hash & n;                             
 90                             if (b != runBit) {                              
 91                                 runBit = b;                                 
 92                                 lastRun = p;                               
 93                             }
 94                         }
 95                         if (runBit == 0) {// runBit== 0 表示該節點不需要移動
 96                             ln = lastRun;
 97                             hn = null;
 98                         }
 99                         else {
100                             hn = lastRun;
101                             ln = null;
102                         }//end
103                         for (Node<K,V> p = f; p != lastRun; p = p.next) {
104                             int ph = p.hash; K pk = p.key; V pv = p.val;
105                             if ((ph & n) == 0)
106                                 ln = new Node<K,V>(ph, pk, pv, ln);
107                             else
108                                 hn = new Node<K,V>(ph, pk, pv, hn);
109                         }
110                         setTabAt(nextTab, i, ln);
111                         setTabAt(nextTab, i + n, hn);
112                         setTabAt(tab, i, fwd);
113                         advance = true;
114                     }
115                     else if (f instanceof TreeBin) {//紅黑樹
116                         TreeBin<K,V> t = (TreeBin<K,V>)f;
117                         TreeNode<K,V> lo = null, loTail = null;
118                         TreeNode<K,V> hi = null, hiTail = null;
119                         int lc = 0, hc = 0;
120                         for (Node<K,V> e = t.first; e != null; e = e.next) {
121                             int h = e.hash;
122                             TreeNode<K,V> p = new TreeNode<K,V>
123                                 (h, e.key, e.val, null, null);
124                             if ((h & n) == 0) {
125                                 if ((p.prev = loTail) == null)
126                                     lo = p;
127                                 else
128                                     loTail.next = p;
129                                 loTail = p;
130                                 ++lc;
131                             }
132                             else {
133                                 if ((p.prev = hiTail) == null)
134                                     hi = p;
135                                 else
136                                     hiTail.next = p;
137                                 hiTail = p;
138                                 ++hc;
139                             }
140                         }
141                         ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
142                             (hc != 0) ? new TreeBin<K,V>(lo) : t;
143                         hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
144                             (lc != 0) ? new TreeBin<K,V>(hi) : t;
145                         setTabAt(nextTab, i, ln);
146                         setTabAt(nextTab, i + n, hn);
147                         setTabAt(tab, i, fwd);
148                         advance = true;
149                     }
150                 }
151             }
152         }
153     }
154 }

5.1、擴容前準備階段

ForwardingNode

1 static final class ForwardingNode<K,V> extends Node<K,V> {
2     final Node<K,V>[] nextTable;
3     ForwardingNode(Node<K,V>[] tab) {
4         super(MOVED, null, null, null);
5         this.nextTable = tab;
6     }
7 }

看一下這個內部類,其實呢其就是一個起到標識作用的節點,該節點看上面程式碼可知,該節點最主要的特點就是hash=MOVED=-1。hash=-1的節點在ConcurrentHashMap中表示該桶是被擴容過程遷移過的桶。然後當前執行緒判斷如果該桶已經被遷移。無論put還是get都去新的陣列中操作。還有一點很重要,還可以通過ForwardingNode中 nextTable獲取到新的陣列。

 

1 //該桶沒資料
2 else if ((f = tabAt(tab, i)) == null)
3     //將oldtab中的該桶設定為fwd節點,hash=-1
4     advance = casTabAt(tab, i, null, fwd);

看上面程式碼,先判斷該桶還有沒有資料。沒資料不用遷移,等同於已經遷移完了。其他執行緒put會直接put到新的陣列中。

 

1 //已經移動過的桶其hash=-1;
2 else if ((fh = f.hash) == MOVED)
3     advance = true; // already processed

如果該桶已經移動則跳過。

到此我們能看出什麼?主要是已經移動完的設定成fwd節點,其它執行緒看到該桶已經移動,則會到新的table中操作。如果未移動,還直接操作當前table,因為就算put,待會處理到該桶,一樣移動到新桶,也沒啥影響。如果是正在移動的接下來會看到加了Synchronized鎖,保證只有一個執行緒能操作當前桶。簡直不要太妙。

5.2、擴容過程

畫重點,擴容過程

 1 synchronized (f) {//上鎖
 2     if (tabAt(tab, i) == f) {
 3         //ln新連結串列,不需要移動的節點重新組組織成的連結串列。
 4         //hn新連結串列,需要移動的節點重新組織成的連結串列
 5         Node<K,V> ln, hn;
 6         if (fh >= 0) {//連結串列
 7             int runBit = fh & n;
 8             Node<K,V> lastRun = f;
 9             //start
10             //從start,到end之間。不看也行。實在費腦子。其實這段程式碼寫的有點讓人費解
11             //主要是不認真看不知道作者的意圖。本意是這樣的。判斷是不是可以從某個節點n開始
12             //後面的節點是不是都是和節點n一樣,移動的目標桶一樣的。
13             //如果是一樣的,則後面的這些節點就不用移動了,只需要移動n節點即可。
14             //(注意連結串列的引用,next指標就把後面的都帶過去了)
15             //想一個極端情況,如果在這裡迭代後發現,所有節點,擴容後資料移動的目標桶都是一樣的。
16             //則只需要移動頭結點即可。不用重新拼接連結串列了。
17             for (Node<K,V> p = f.next; p != null; p = p.next) {
18                 int b = p.hash & n;                             
19                 if (b != runBit) {                              
20                     runBit = b;                                 
21                     lastRun = p;                               
22                 }
23             }
24             if (runBit == 0) {// runBit== 0 表示該節點不需要移動
25                 ln = lastRun;
26                 hn = null;
27             }
28             else {
29                 hn = lastRun;
30                 ln = null;
31             }//end
32             for (Node<K,V> p = f; p != lastRun; p = p.next) {
33                 int ph = p.hash; K pk = p.key; V pv = p.val;
34                 if ((ph & n) == 0)
35                     ln = new Node<K,V>(ph, pk, pv, ln);
36                 else
37                     hn = new Node<K,V>(ph, pk, pv, hn);
38             }
39             setTabAt(nextTab, i, ln);
40             setTabAt(nextTab, i + n, hn);
41             setTabAt(tab, i, fwd);
42             advance = true;
43         }
44         else if (f instanceof TreeBin) {//紅黑樹
45             //紅黑樹跳過
46         }
47     }
48 }

 

 5.2.1、併發控制

首先擴容過程是在synchronized同步程式碼塊中的。並且只鎖了一個表頭。可看到沒有鎖新陣列nextTab的桶。想想,oldTab(tab變數)和nextTab都是多個執行緒共享的變數,為什麼只有只鎖了oldTab正在操作的桶?如果有多個執行緒向nextTab同時遷移資料怎麼辦?會不會存線上程安全性問題?

TIPS:
    統一術語
    tab = oldTab = table(舊陣列)
    newTab = nextTab(擴容後新陣列)
    oldIndex即在oldTab中的索引位
    newIndex即在newTab中的位置

在上一篇文章中介紹HashMap的時候詳細介紹了HashMap擴容中,oldTab舊桶遷移向newTab只有兩個目標桶。再簡單回顧一遍。

上面這張圖形象的展示了舊桶在擴容後的兩個去向:1,索引位原地不動,2,索引位為oldCap+oldIndex。(關於為什麼是這兩個去向,在HashMap擴容中已經詳細介紹了)

如果你還沒懂我的疑問,請參考下面這個圖。

前提,ConcurrentHashMap是併發擴容,可以有多個執行緒同時擴容,其次如果如上圖紅線那樣,oldTab中有多個桶中的資料遷移到newTab中的同一個桶中,如果出現這種情況就意味著存線上程安全性問題。

從上圖5-1中,兩個資料遷移的方向可知,擴容前,oldIndex不同就表示不在一個桶,擴容後的兩個去向如果oldIndex不一樣,也一定不在同一個桶。所以不會出現5-2圖中紅線的那種情況,也就說明在擴容過程中不需要鎖newTab。佩服+2

 

5.2.2、資料遷移

//ln新連結串列,不需要移動的節點重新組組織成的連結串列。
//hn新連結串列,需要移動的節點重新組織成的連結串列
Node<K,V> ln, hn;
int runBit = fh & n;

看兩個變數,上面說過擴容後,舊桶中的資料只有兩個遷移的方向。ln正是資料遷移後索引位依然是oldIndex的資料的連結串列,hn是遷移後需要遷移到oldCap + oldIndex索引位的連結串列。

關注一下runBit變數,如果 runBit == 0 成立則說明遷移後桶的索引位依然是oldIndex。詳見HashMap擴容分析。

重點關注一下start到end之間的程式碼

關於這段程式碼,首選我們假設一種極端情況,如果當前正在移動的桶中的資料在rehash之後,資料遷移的目標桶除了第一個節點的目標桶是oldIndex之外,後面的資料的目標桶都是oldIndex + oldCap。我們還需要處理後面的節點嗎?不需要,因為只需要將第二個節點移動到newTab的oldIndex + oldCap位置即可。第二個元素也就是lastRun變數。相對於HashMap完全的將資料組織成兩個連結串列,這也算得上是一個性能上的優化吧。

接著往下看

程式碼段1:

1 for (Node<K,V> p = f.next; p != null; p = p.next) {
2     int b = p.hash & n;                             
3     if (b != runBit) {//相同跳過                          
4         runBit = b;                                 
5         lastRun = p;                               
6     }
7 }

以上程式碼通過對連結串列的一次掃描決定了lastRun。

程式碼段2:

1 if (runBit == 0) {// runBit== 0 表示該節點不需要移動
2     ln = lastRun;
3     hn = null;
4 }
5 else {
6     hn = lastRun;
7     ln = null;
8 }//end

 

根據lastRun指向的節點的runBit決定後續節點在擴容後是oldIndex + oldCap還是oldIndex。

程式碼段3:

1 for (Node<K,V> p = f; p != lastRun; p = p.next) {
2     int ph = p.hash; K pk = p.key; V pv = p.val;
3     if ((ph & n) == 0)
4         ln = new Node<K,V>(ph, pk, pv, ln);
5     else
6         hn = new Node<K,V>(ph, pk, pv, hn);
7 }

 

上述程式碼會重新組織兩個新連結串列。注意這個迭代到lastRun位置結束,因為以上過程已經確定了lastRun的歸屬。

看一下 ln = new Node<K,V>(ph, pk, pv, ln); 重新組織連結串列的程式碼,也就是ln會成為新new出來的node的下一個節點。

這樣有什麼問題?問題就是節點在舊桶中的相對順序在新桶中將相反。也就是next的指標翻轉一下。可以看一下node的建構函式就明瞭了。

 

演示擴容過程

假設當前擴容前oldCap即oldTab的長度為2,擴容後newCap即newTab的長度為4。如下圖看擴容過程,橘色的代表遷移後索引位依然是oldIndex,綠色代表擴容後索引位為oldIndex + oldCap。

 

 

 上述程式碼段1迭代找到了lastRun即指向node(11),程式碼段2將lastRun賦值給hn。程式碼段3執行過程如下

1,將node(1)拼接到ln
2,將node(3)拼接到hn,此時注意,hn已經lastRun指向的節點node(11),此時hn=3—>11—>15—>19—>null
3,處理node(5)拼接到ln
4,處理...

 

對比JDK7 HashMap,JDK8 HashMap,JDK8 ConcurrentHashMap在擴容後對節點相對順序的保證方面,JDK7 HashMap是完全倒序。JDK8 HashMap不改變相對順序。JDK8 ConcurrentHashMap 保證部分節點的相對順序,其餘的倒序。

題外話,從程式碼風格和死路上,猜測一下ConcurrentHashMap應該是來自JDK7的HashMap。

 

1 setTabAt(nextTab, i, ln);
2 setTabAt(nextTab, i + n, hn);
3 setTabAt(tab, i, fwd);
4 advance = true;

ln和hn兩個連結串列各回各家各找各媽。

 

回過頭來再看put方法中的幫助擴容

1 else if ((fh = f.hash) == MOVED)
2     tab = helpTransfer(tab, f);

 

在put方法中有這樣一行判斷,當f.hash = MOVED即當前HashMap正在擴容中,則當前執行緒會去嘗試幫助擴容。

 1 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
 2     Node<K,V>[] nextTab; int sc;
 3     if (tab != null && (f instanceof ForwardingNode) &&//條件判斷
 4         (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//從fwd節點中取出新table
 5         int rs = resizeStamp(tab.length);
 6         while (nextTab == nextTable && table == tab &&
 7                (sc = sizeCtl) < 0) {
 8             if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
 9                 sc == rs + MAX_RESIZERS || transferIndex <= 0)
10                 break;
11             if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//修改sizeCtl = sizeCtl + 1,表示多了一個執行緒參與擴容
12                 transfer(tab, nextTab);
13                 break;
14             }
15         }
16         return nextTab;
17     }
18     return table;
19 }

 

在helpTransfer方法中會首先做一系列判斷,通過fwd節點獲取到nextTab即新的陣列。通過CAS 實現sizeCtl++操作,表示多了一個執行緒進行擴容,因為在擴容方法中對擴容執行緒數量有控制。

 

最後的最後,擴容的時機

說一下觸發擴容的操作,總的來說就是put操作,但是有兩個時機很重要,其一就是addCount方法中,每次put一個元素,在addCount方法中都會判斷需不需要進行擴容。另外就是treeifyBin方法中,如果桶中資料超過了8個並且陣列長度<64則不會進行樹化,而是進行擴容。關於這個在HashMap原始碼介紹中也有介紹。你想想,假如容量為16,你就插入了9個元素,巧了,都在同一個桶裡面,如果這時進行樹化,樹化本身就是一個耗時的過程。時間複雜度會增加,效能下降,不如直接進行擴容,空間換時間。

終於擴容過程寫完了。很經典,想讀懂也很費勁。

六、get過程原始碼

 1 public V get(Object key) {
 2     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
 3     int h = spread(key.hashCode());//hash
 4     if ((tab = table) != null && (n = tab.length) > 0 &&
 5         (e = tabAt(tab, (n - 1) & h)) != null) {//取桶
 6         if ((eh = e.hash) == h) {//key相同直接返回
 7             if ((ek = e.key) == key || (ek != null && key.equals(ek)))
 8                 return e.val;
 9         }
10         else if (eh < 0)//hash < 0 表示正在擴容
11             //在這裡需要非常注意的一點,擴容後的桶會放入fwd節點
12             //該節點hash = MOVED,fwd.nextTable為擴容後新的陣列。
13             return (p = e.find(h, key)) != null ? p.val : null;
14         while ((e = e.next) != null) {//迭代連結串列
15             if (e.hash == h &&
16                 ((ek = e.key) == key || (ek != null && key.equals(ek))))
17                 return e.val;
18         }
19     }
20     return null;
21 }

 

get原始碼只關注下面這行

return (p = e.find(h, key)) != null ? p.val : null;

 

當該桶已經被移動,則通過e.find方法去nextTab新陣列查詢。首先在5章節resize擴容方法中,已經擴容的桶會被塞進去一個ForwardingNode節點 setTabAt(tab, i, fwd); 繼續看resize方法中ForwardingNode的初始化會發現是這樣初始化的 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); ,看它的構造方法

static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}

 

不難發下其初始化方法接收一個nextTab也就是擴容後的新陣列,並將該陣列賦值給其內部變數nextTable。也就是說當get發現桶已經擴容後,我們可以從fwd節點中找到新的陣列。並從新的陣列中找到新的目標桶並進行元素查詢。

看了以上程式碼,回到 e.find(h, key)) ,需要明確的是e就是ForwardingNode節點。看看find方法

 1 Node<K,V> find(int h, Object k) {
 2     // loop to avoid arbitrarily deep recursion on forwarding nodes
 3     outer: for (Node<K,V>[] tab = nextTable;;) {//迭代nextTable
 4         Node<K,V> e; int n;
 5         if (k == null || tab == null || (n = tab.length) == 0 ||
 6             (e = tabAt(tab, (n - 1) & h)) == null)
 7             return null;
 8         for (;;) {
 9             int eh; K ek;
10             if ((eh = e.hash) == h &&
11                 ((ek = e.key) == k || (ek != null && k.equals(ek))))
12                 return e;
13             if (eh < 0) {
14                 if (e instanceof ForwardingNode) {
15                     tab = ((ForwardingNode<K,V>)e).nextTable;
16                     continue outer;
17                 }
18                 else
19                     return e.find(h, k);
20             }
21             if ((e = e.next) == null)
22                 return null;
23         }
24     }
25 }

OK,很明確了,確實是從nextTable中查詢的。

得出一個結論,ConcurrentHashMap擴容不影響get操作。也就是在擴容過程中可以併發讀。

七、ConcurrentHashMap併發控制

詳細看了ConcurrentHashMap put resize get過程的原始碼,本章從整體上看一下ConcurrentHashMap的併發控制。

下面結合圖片我們看一下ConcurrentHashMap的併發過程。

 

 

 

 

 如上圖,執行緒1進行put操作,這時發現size > sizeCtl。開始進行擴容

 

 

 

 

此時執行緒1已經完成oldTab中索引[2,16)中的擴容。正在進行索引為1的桶的擴容。接下來執行緒2執行get。

 

 

 

 

執行緒2根據get邏輯和key的hash,可能訪問的三種情況如上圖所示

情況一:訪問藍色號桶,即未擴容的桶。該桶還未進行擴容,所以在桶中找到對應元素,返回。

情況二:訪問綠色桶,即正在擴容的桶。該桶正在擴容,在擴容過程中,執行緒1持有Synchronized鎖,執行緒2只能自旋等待。

情況三:訪問橘色桶,該桶已擴容的桶。該桶已擴容,oldTab中是fwd節點,hash=-1,所以執行fwd節點的find邏輯,fwd節點持有newTab(nextTable),所以執行緒2去newTab中查詢對應元素,返回。

 

 如上圖4,當執行緒1進行擴容時,執行緒3進來執行put,同樣存在三種可能的情況

情況一:訪問藍色桶,即未擴容的桶。正常執行put邏輯。

情況二:訪問綠色桶,即正擴容的桶。因為線層1持有Synchronized鎖,執行緒3將一直自旋,等待擴容結束。

情況三:訪問橘色桶,即已擴容的桶。因為已擴容的桶,在oldTab中是fwd節點,hash = -1 = MOVED,所以執行緒3執行幫助擴容的邏輯。等待擴容完成,執行緒3繼續完成put邏輯。

 

OK,以上就是ConcurrentHashMap關於get put resize的併發控制,從以上過程可見,存在鎖競爭的情況很有限,即使存在鎖競爭,也是進行自旋,而不會阻塞執行緒。可見ConcurrentHashMap能做到高效的併發讀。

在put過程中,因為如果存線上程正在已經擴容,則幫助進行擴容(協助擴容這塊,有一個步長的概念,同時進行擴容的執行緒和table的長度有關)。如果當前桶正在進行擴容,則被Synchronized鎖拒之門外,自旋等待擴容結束。如果訪問的是未擴容的桶,則執行正常的put邏輯。可見整個過程中,由於鎖的粒度很小,put做到了高效的併發寫,也做到了高效的擴容。

總之一句話ConcurrentHashMap的高併發是通過 CAS樂觀鎖 + 自旋鎖 + 細粒度 保證的。

 

  如有錯誤的地方還請留言指正。
  原創不易,轉載請註明原文地址:https://www.cnblogs.com/hello-shf/p/12183263.h