1. 程式人生 > >【轉】Java併發程式設計:併發容器之ConcurrentHashMap

【轉】Java併發程式設計:併發容器之ConcurrentHashMap

  JDK5中添加了新的concurrent包,相對同步容器而言,併發容器通過一些機制改進了併發效能。因為同步容器將所有對容器狀態的訪問都序列化了,這樣保證了執行緒的安全性,所以這種方法的代價就是嚴重降低了併發性,當多個執行緒競爭容器時,吞吐量嚴重降低。因此Java5.0開始針對多執行緒併發訪問設計,提供了併發效能較好的併發容器,引入了java.util.concurrent包。與Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的併發容器主要解決了兩個問題:

  1)根據具體場景進行設計,儘量避免synchronized,提高併發性。

  2)定義了一些併發安全的複合操作,並且保證併發環境下的迭代操作不會出錯。

  util.concurrent中容器在迭代時,可以不封裝在synchronized中,可以保證不拋異常,但是未必每次看到的都是“最新的、當前的”資料。

  下面是對併發容器的簡單介紹:

  ConcurrentHashMap代替同步的Map(Conllections.synchronizedMap(new HashMap<>())),眾所周知,HashMap是根據雜湊值分段儲存的,同步Map在同步的時候鎖住了所有的段,而ConcurrentHashMap加鎖的時候根據雜湊值鎖住了雜湊值鎖對應的那段,因此提高了併發效能

  ConcurrentHashMap也增加了對常用複合操作的支援,比如“若沒有則新增”:putIfAbsent(),替換:replace()。這2個操作都是原子操作。

  CopyOnWriteArrayList和CopyOnWriteArraySet分別代替List和Set,主要是在遍歷操作為主的情況下來代替同步的List和同步的Set,這也就是上面所述的思路:迭代過程要保證不出錯,除了加鎖,另外一種方法就是“克隆”容器物件。

  ConcurrentLinkedQueue是一個先進先出的佇列,它是非阻塞佇列。

  ConcurrentSkipListMap可以在高效併發中替代SoredMap(例如用Collections.synchronizedMap包裝的TreeMap)。

  ConcurrentSkipListSet可以在高效併發中替代SoredSet(例如用Collections.synchronizedSet包裝的TreeSet)。

  大家都知道HashMap是非執行緒安全的,Hashtable是執行緒安全的,但是由於Hashtable是採用synchronized進行同步,相當於所有執行緒進行讀寫時都去競爭一把鎖,導致效率非常低下。

  ConcurrentHashMap可以做到讀取資料不加鎖,並且其內部的結構可以讓其在進行寫操作的時候能夠將鎖的粒度保持得儘量的小,不用對整個ConcurrentHashMap加鎖

  

ConcurrentHashMap的內部結構

  ConcurrentHashMap為了提高本身的併發能力,在內部採用了一個叫做Segment的結構,一個Segment其實就是一個類雜湊表的結構,Segment內部維護了一個連結串列陣列,我們用下面這一幅圖來看下ConcurrentHashMap的內部結構:

  從上面的結構可以瞭解到,ConcurrentHashMap定位一個元素的過程需要進行兩次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的連結串列的頭部,因此這一種結構帶來的副作用是Hash的過程要比普通的HashMap要長,但是帶來的好處是寫操作的時候可以只對元素所在的Segment進行加鎖即可,不會影響到其它的Segment,這樣,在最理想的情況下,ConcurrentHashMap可以最高同時支援Segment數量大小的寫操作(剛好這些寫操作都非常平均地分佈在所有的Segment上),所以,通過這一種結構,ConcurrentHashMap的併發能力可以大大的提高

 

Segment

  我們再來具體瞭解一下Segment的資料結構:

1 static final class Segment<K,V> extends ReentrantLock implements Serializable {
2     transient volatile int count;
3     transient int modCount;
4     transient int threshold;
5     transient volatile HashEntry<K,V>[] table;
6     final float loadFactor;
7 }

  詳細解釋一下segment裡面的成員變數的意義:

  • count:Segment中元素的數量,它是volatile,用來協調修改和讀取操作,以保證讀取操作能夠讀取到幾乎最新的修改。協調方式是這樣的,每次修改操作做了結構上的改變,如增加/刪除節點(修改節點的值不算結構上的改變),都要寫count值,每次讀取操作開始都要讀取count值。這利用了Java 5中對volatile語義的增強,對同一個volatile變數的寫和讀存在happens-before關係(對一個volatile域的寫,happens-before於任意後續對這個volatile域的讀,即寫操作的執行結果,對讀操作可見)
  • modCount:統計段結構改變的次數,主要是為了檢測對多個段進行遍歷過程中某個段是否發生改變,在講述跨段操作時還會講述。
  • threshold:用來表示需要進行rehash的界限值,超過該閾值,則對Segment中陣列的大小進行擴容。
  • table:table也是volatile,這使得能夠讀取到最新的table值而不需要同步。
  • loadFactor:表示負載因子,用於確定threshold。

HashEntry

  Segment中的元素是以HashEntry的形式存放在連結串列陣列中的,看一下HashEntry的結構:

1 static final class HashEntry<K,V> {
2     final K key;
3     final int hash;
4     volatile V value;
5     final HashEntry<K,V> next;
6 }

  可以看到HashEntry的一個特點,除了value以外,其它的幾個變數都是final的,這意味著不能從hash鏈的中間或尾部新增或刪除節點,因為這需要修改next引用值,所有的節點的修改只能從頭部開始。對於put操作,可以一律新增到HashEntry鏈的頭部(next為final型,它的唯一一次賦值可以發生在構造方法中,即可以使用new HashEntry(...),在構造方法HashEntry(...)中放入key,hash,value,並將next賦值為原HashEntry鏈的頭部。PS:太巧妙了TAT)。但是對於remove操作,可能需要從中間刪除一個節點,這就需要將要刪除的節點的前面所有節點整個複製一遍(使用new HashEntry(...)在構造方法中將傳入待複製的節點的key,hash,value,然後將next指向新節點),最後一個節點指向要刪除節點的下一個節點。將value設定成volatile,這避免了加鎖。

 

ConcurrentHashMap的初始化

  下面結合原始碼來具體分析一下ConcurrentHashMap的實現,先看下初始化方法:

 1 public ConcurrentHashMap(int initialCapacity,
 2                          float loadFactor, int concurrencyLevel) {
 3     if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
 4         throw new IllegalArgumentException();
 5   
 6     if (concurrencyLevel > MAX_SEGMENTS)
 7         concurrencyLevel = MAX_SEGMENTS;
 8   
 9     // Find power-of-two sizes best matching arguments
10     int sshift = 0;
11     int ssize = 1;
12     // 保證ssize一定為2的指數個
13     // 如concurrencyLevel為11,12,13,14,15,16時,ssize都為16
14     while (ssize < concurrencyLevel) {  
15         ++sshift;   // 記錄ssize左移的次數
16         ssize <<= 1;
17     }
18     // 這兩個全域性變數在定位segment時的雜湊演算法裡需要使用
19     segmentShift = 32 - sshift; // 之所以用32是因為ConcurrentHashMap裡的hash()方法輸出的最大數是32位的
20     segmentMask = ssize - 1;    // 為雜湊運算的掩碼,等於ssize,保證其二進位制位都是1
21     this.segments = Segment.newArray(ssize);
22   
23     if (initialCapacity > MAXIMUM_CAPACITY)
24         initialCapacity = MAXIMUM_CAPACITY;
25     int c = initialCapacity / ssize;    // c為ssize的倍數
26     if (c * ssize < initialCapacity)
27         ++c;
28     int cap = 1;
29     while (cap < c) // 如果c大於1,就會取大於等於c的2的N次方值,所以cap不是1,就是2的N次方
30         cap <<= 1;
31   
32     for (int i = 0; i < this.segments.length; ++i)
33         this.segments[i] = new Segment<K,V>(cap, loadFactor); // segment的容量threshold = (int)cap*loadFactor

  ConcurrentHashMap的初始化一共三個引數,一個initialCapacity,表示初始的容量,一個loadFactor,表示負載引數,最後一個是concurrentLevel,代表ConcurrentHashMap內部的Segment的數量,concurrentLevel一經指定,不可改變,後續如果ConcurrentHashMap的元素數量增加導致ConcurrentHashMap需要擴容,ConcurrentHashMap不會增加Segment的數量,而只會增加Segment中連結串列陣列的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而只需要對Segment裡面的元素做一次rehash就可以了

  整個ConcurrentHashMap的初始化方法還是非常簡單的,先是根據concurrentLevel來new出Segment,這裡Segment的數量是不大於concurrentLevel的最大的2的指數(while(ssize < concurrentLevel) {ssize <<= 1}),就是說Segment的數量永遠是2的指數個,這樣的好處是方便採用移位操作來進行hash,加快hash的過程。接下來就是根據initialCapacity確定Segment的容量大小,每一個Segment的容量大小也是2的指數,同樣是為了加快hash的過程。

  這邊需要特別注意兩個變數:segmentShift和segmentMask,這兩個變數在後面將會起很大的作用,假設建構函式確定了segment的數量是2的n次方,那麼segmentShift就等於32減去n,而segmentMask就等於2的n次方減一。

 

ConcurrentHashMap的get操作

  前面提到過ConcurrentHashMap的get操作是不用加鎖的,我們這裡看一下其實現:

1 public V get(Object key) {
2     int hash = hash(key.hashCode());
3     return segmentFor(hash).get(key, hash);
4 }

  看第三行,segmentFor這個函式用來確定操作應該在哪一個segment中進行,幾乎對ConcurrentHashMap的所有操作都需要用到這個函式,我們來看下這個函式的實現:

1 final Segment<K,V> segmentFor(int hash) {
2     return segments[(hash >>> segmentShift) & segmentMask];
3 }

  這個函式用了位操作來確定Segment,根據傳入的hash值向右無符號右移segmentShift位,然後和segmentMask進行與操作,綜合我們之前說的segmentShift和segmentMask的值,就可以得出如下結論:假設Segment的數量是2的n次方,根據元素的hash值的高n位就可以確定元素到底在哪一個Segment中

  在確定了需要在哪一個segment中進行操作後,接下來的事情就是呼叫對應的segment的get方法:

 1 V get(Object key, int hash) {
 2     if (count != 0) { // read-volatile
 3         HashEntry<K,V> e = getFirst(hash);
 4         while (e != null) {
 5             if (e.hash == hash && key.equals(e.key)) {
 6                 V v = e.value;
 7                 if (v != null)
 8                     return v;
 9                 return readValueUnderLock(e); // recheck
10             }
11             e = e.next;
12         }
13     }
14     return null;
15 }

  先看第二行程式碼,這裡對count進行了一次判斷,其中count表示該Segment中包含的元素的數量,我們可以來看一下count的定義:

transient volatile int count;

  可以看到count是volatile的,實際上這裡面利用了volatile的語義:

  “對volatile欄位的寫入操作happens-before於每一個後續的同一個欄位的讀操作。”

  因為實際上put、remove等操作也會更新count的值,所以當競爭發生的時候,volatile的語義可以保證寫操作在讀操作之前,可就保證了寫操作對後續的讀操作都是可見的。

  通過這種機制來保證get操作能夠得到幾乎最新的結構更新。對於非結構更新(也就是結點值的改變),由於HashEntry的value變數是volatile的,也能保證讀取到“最新”的值。接下來就是對hash鏈進行遍歷找到要獲取的節點,如果沒有找到,直接返回null。對hash鏈進行遍歷不需要加鎖的原因在於鏈指標next是final的,但是頭指標卻不是final的,頭指標是通過getFirst(hash)方法返回的

1 HashEntry<K,V> getFirst(int hash) {
2     HashEntry<K,V>[] tab = table;
3     return tab[hash & (tab.length - 1)];
4 }

  也就是存在table陣列中的值。這使得getFirst(hash)可能返回過時的頭節點。例如,當執行get方法時,剛執行完getFirst(hash)之後,另一個執行緒執行了刪除操作並更新頭結點,這就導致get方法中返回的頭結點不是最新的。這是可以允許的,通過對count變數的協調機制,get能讀取到幾乎最新的資料,雖然可能不是最新的。要得到最新的資料,只有採用完全的同步

  最後,如果找到了所求的節點,判斷它的值如果非空就直接返回,否則在有鎖的狀態下再讀一次。這似乎有些費解,理論上節點的值不可能為空,這是因為put的時候就進行了判斷,如果為空就丟擲NullPointerException。空值的唯一源頭就是HashEntry中的預設值,因為HashEntry中的value不是final的,非同步讀取有可能讀取到空值。仔細看下put操作的語句:tab[index] = new HashEntry<K,V>(key,hash,first,value),在這條語句中,HashEntry建構函式中對value的賦值以及對tab[index]的賦值可能被重新排序,這就可能導致結點的值為空(HashEntry物件構造好了,但對value的賦值還未完成,此時取到其預設值空)。這種情況應當很罕見,一旦發生這種情況,ConcurrentHashMap採取的方式是在持有鎖的情況下再讀一遍,這能夠保證讀到最新的值,並且一定不會為空值。

1 V readValueUnderLock(HashEntry<K,V> e) {
2     lock();
3     try {
4         return e.value;
5          } finally {
6                 unlock();
7             }
8 }

  

ConcurrentHashMap的put操作

  看完了get操作,再看下put操作,put操作的前面也是確定Segment的過程,這裡不再贅述,直接看關鍵的segment的put方法:

 1 V put(K key, int hash, V value, boolean onlyIfAbsent) {
 2     lock();
 3     try {
 4         int c = count;
 5         if (c++ > threshold) // ensure capacity
 6             rehash();
 7         HashEntry<K,V>[] tab = table;
 8         int index = hash & (tab.length - 1);
 9         HashEntry<K,V> first = tab[index];
10         HashEntry<K,V> e = first;
11         while (e != null && (e.hash != hash || !key.equals(e.key)))
12             e = e.next;
13   
14         V oldValue;
15         if (e != null) {
16             oldValue = e.value;
17             if (!onlyIfAbsent)
18                 e.value = value;
19         }
20         else {
21             oldValue = null;
22             ++modCount;
23             tab[index] = new HashEntry<K,V>(key, hash, first, value);
24             count = c; // write-volatile
25         }
26         return oldValue;
27     } finally {
28         unlock();
29     }
30 }

  該方法是在持有段鎖的情況下執行的,在第五行,如果Segment中元素的數量超過了閾值(由建構函式中的loadFactor算出)就需要對segment進行擴容,並且要進行rehash,關於rehash的過程大家可以自己去了解,這裡不詳細講了。

  第8行和第9行的操作就是getFirst的過程,確定連結串列頭部的位置。

  第11行這裡的這個while迴圈是在連結串列中尋找和要put的元素相同key的元素,如果找到,就直接更新key的value,如果沒有找到,則進入21行這裡,生成一個新的HashEntry並且把它加到整個segment的頭部,然後再更新count值。

 

ConcurrentHashMap的remove操作

  remove操作的前面一部分和前面的get、put操作一樣,都是定位segment的過程,然後再呼叫segment的remove方法:

 1 V remove(Object key, int hash, Object value) {
 2     lock();
 3     try {
 4         int c = count - 1;
 5         HashEntry<K,V>[] tab = table;
 6         int index = hash & (tab.length - 1);
 7         HashEntry<K,V> first = tab[index];
 8         HashEntry<K,V> e = first;
 9         while (e != null && (e.hash != hash || !key.equals(e.key)))
10             e = e.next;
11   
12         V oldValue = null;
13         if (e != null) {
14             V v = e.value;
15             if (value == null || value.equals(v)) {
16                 oldValue = v;
17                 // All entries following removed node can stay
18                 // in list, but all preceding ones need to be
19                 // cloned.
20                 ++modCount;
21                 HashEntry<K,V> newFirst = e.next;
22                 for (HashEntry<K,V> p = first; p != e; p = p.next)
23                     newFirst = new HashEntry<K,V>(p.key, p.hash,
24                                                   newFirst, p.value);
25                 tab[index] = newFirst;
26                 count = c; // write-volatile
27             }
28         }
29         return oldValue;
30     } finally {
31         unlock();
32     }
33 }

  整個操作是在持有段鎖的情況下執行的,空白行之前的行主要是定位到要刪除的節點e。接下來,如果不存在這個節點就直接返回null,否則就要將e前面的節點複製一遍,尾節點指向e的下一個節點。e後面的節點不需要複製,它們可以重用。(之前說過HashEntry中的next是final的,一經賦值以後就不可修改,所以只能通過複製來達成目的)如下圖所示:

 

 

  

   注意,複製的節點中,值為2的節點在前面,值為1的節點在後面,也就是剛好和原來節點順序相反

  整個remobe實現並不複雜,但是需要注意如下幾點:第一,當要刪除的節點存在時,刪除的最後一步操作要將count的值減一。這必須是最後一步操作,否則讀取操作可能看不到之前對段所做的結構性修改。第二,remove執行的開始就將table賦值給一個區域性變數tab,這是因為table是volatile變數,讀寫volatile變數的開銷很大。編譯器也不能對volatile變數的讀寫做任何優化,直接多次訪問非volatile例項變數沒有多大影響,編譯器會做相應優化。

 

跨段操作

  有些操作需要涉及到多個段,比如說size(),containsValue()。先來看下size()方法:

 1 public int size() {
 2     final Segment<K,V>[] segments = this.segments;
 3     long sum = 0;
 4     long check = 0;
 5     int[] mc = new int[segments.length];
 6     // Try a few times to get accurate count. On failure due to
 7     // continuous async changes in table, resort to locking.
 8     for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
 9         check = 0;
10         sum = 0;
11         int mcsum = 0;
12         for (int i = 0; i < segments.length; ++i) {
13             sum += segments[i].count;
14             mcsum += mc[i] = segments[i].modCount;
15         }
16         if (mcsum != 0) {
17             for (int i = 0; i < segments.length; ++i) {
18                 check += segments[i].count;
19                 if (mc[i] != segments[i].modCount) {
20                     check = -1; // force retry
21                     break;
22                 }
23             }
24         }
25         if (check == sum)
26             break;
27     }
28     if (check != sum) { // Resort to locking all segments
29         sum = 0;
30         for (int i = 0; i < segments.length; ++i)
31             segments[i].lock();
32         for (int i = 0; i < segments.length; ++i)
33             sum += segments[i].count;
34         for (int i = 0; i < segments.length; ++i)
35             segments[i].unlock();
36     }
37     if (sum > Integer.MAX_VALUE)
38         return Integer.MAX_VALUE;
39     else
40         return (int)sum;
41 }

  size方法主要思路是先在沒有鎖的情況下對所有段大小求和,如果不能成功(這是因為遍歷過程中可能有其它執行緒正在對已經遍歷過的段進行結構性更新),最多執行RETRIES_BEFORE_LOCK次,如果還不成功就在持有所有段鎖的情況下再對所有段大小求和。在沒有鎖的情況下主要是利用Segment中的modCount進行檢測,在遍歷過程中儲存每個Segment的modCount,遍歷完成之後再檢測每個Segment的modCount有沒有改變,如果有改變表示有其它執行緒正在對Segment進行結構性併發更新,需要重新計算。

  size()的實現還有一點需要注意,必須要先segments[i].count,才能segments[i].modCount,這是因為segment[i].count是對volatile變數的訪問,接下來segments[i].modCount才能得到幾乎最新的值(前面我已經說了為什麼只是“幾乎”了)。這點在containsValue方法中得到了淋漓盡致的展現:

 1 public boolean containsValue(Object value) {
 2     if (value == null)
 3         throw new NullPointerException();
 4 
 5     // See explanation of modCount use above
 6 
 7     final Segment<K,V>[] segments = this.segments;
 8     int[] mc = new int[segments.length];
 9 
10     // Try a few times without locking
11     for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
12         int sum = 0;
13         int mcsum = 0;
14         for (int i = 0; i < segments.length; ++i) {
15             int c = segments[i].count;
16             mcsum += mc[i] = segments[i].modCount;
17             if (segments[i].containsValue(value))
18                 return true;
19         }
20         boolean cleanSweep = true;
21         if (mcsum != 0) {
22             for (int i = 0; i < segments.length; ++i) {
23                 int c = segments[i].count;
24                 if (mc[i] != segments[i].modCount) {
25                     cleanSweep = false;
26                     break;
27                 }
28             }
29         }
30         if (cleanSweep)
31             return false;
32     }
33     // Resort to locking all segments
34     for (int i = 0; i < segments.length; ++i)
35         segments[i].lock();
36     boolean found = false;
37     try {
38         for (int i = 0; i < segments.length; ++i) {
39             if (segments[i].containsValue(value)) {
40                 found = true;
41                 break;
42             }
43         }
44     } finally {
45         for (int i = 0; i < segments.length; ++i)
46             segments[i].unlock();
47     }
48     return found;
49 }

  注意內層的第一個for迴圈,裡面有語句int c = segments[i].count;但是c卻從來沒有被使用過,即時如此,編譯器也不能做優化將這條語句去掉,因為存在對volatile變數count的讀取,這條語句存在的唯一目的就是保證segments[i].modCount讀取到幾乎最新的值。

 

解釋“必須要先segments[i].count,才能segments[i].modCount,這是因為segment[i].count是對volatile變數的訪問,接下來segments[i].modCount才能得到幾乎最新的值”


  寫volatile變數和它之前的讀寫操作是不能重排序reorder的,讀volatile變數和它之後的讀寫操作也是不能reorder的。

  在此程式中,表現為修改modCount發生在修改count之前(檢視原始碼會發現在寫count之前必定有寫modCount),由於count是volatile變數,修改modCount不能和寫count的操作reorder,讀取count和它之後的操作,比如讀取modCount,不能reorder。有了這兩個“不能reorder”才能保證讀取了count之後,能讀到執行緒在寫count之前寫入的modCount值,這個modCount值是幾乎最新的。如果在讀modCount之前不讀count,讀modCount甚至可能會reorder到寫modCount之前。如果寫modCount放在寫count之後,則寫modCount可能會被reorder到讀modCount之後。即讀寫順序需要相互配合,才能保證讀取到的modCount幾乎是最新的


  最後簡單地介紹下迭代方法,如keySet(),values(),entrySet()方法,這些方法都返回相應的迭代器,所有迭代器都繼承於HashIterator類裡實現的主要的方法。其結構是:

1 abstract class HashIterator {
2     int nextSegmentIndex;
3     int nextTableIndex;
4     HashEntry<K,V>[] currentTable;
5     HashEntry<K, V> nextEntry;
6     HashEntry<K, V> lastReturned;
7 }

  nextSegmentIndex是段的索引,nextTableIndex是nextSegmentIndex對應段中hash鏈的索引,currentTable是nextSegmentIndex對應段的table。呼叫next方法時主要是呼叫了advance()方法:

 1 final void advance() {
 2     if (nextEntry != null && (nextEntry = nextEntry.next) != null)
 3         return;
 4 
 5     while (nextTableIndex >= 0) {
 6         if ( (nextEntry = currentTable[nextTableIndex--]) != null)
 7             return;
 8     }
 9 
10     while (nextSegmentIndex >= 0) {
11         Segment<K,V> seg = segments[nextSegmentIndex--];
12         if (seg.count != 0) {
13             currentTable = seg.table;
14             for (int j = currentTable.length - 1; j >= 0; --j) {
15                 if ( (nextEntry = currentTable[j]) != null) {
16                     nextTableIndex = j - 1;
17                     return;
18                 }
19             }
20         }
21     }
22 }

  不想再多介紹了,唯一需要注意的是跳到下一個段時,一定要先讀取下一個段的count變數。

  這種迭代方式的主要效果是不會丟擲ConcurrentModificationException。一旦獲取到下一個段的table,也就意味著這個段的頭結點在迭代過程中就確定了,在迭代過程中就不能反映對這個段節點併發的刪除和新增,對於節點的更新是能夠反映的,因為節點的值是一個volatile變數。

 

參考文章:《Java併發程式設計:併發容器之oncurrentHashMap

      《ConcurrentHashMap之實現細節