1. 程式人生 > >Java7/8 中 HashMap 和 ConcurrentHashMap的對比和分析

Java7/8 中 HashMap 和 ConcurrentHashMap的對比和分析

網上關於 HashMap 和 ConcurrentHashMap 的文章確實不少,不過缺斤少兩的文章比較多,所以才想自己也寫一篇,把細節說清楚說透,尤其像 Java8 中的 ConcurrentHashMap,大部分文章都說不清楚。終歸是希望能降低大家學習的成本,不希望大家到處找各種不是很靠譜的文章,看完一篇又一篇,可是還是模模糊糊。

閱讀建議:四節基本上可以進行獨立閱讀,建議初學者可按照 Java7 HashMap -> Java7 ConcurrentHashMap -> Java8 HashMap -> Java8 ConcurrentHashMap 順序進行閱讀,可適當降低閱讀門檻。

閱讀前提:本文分析的是原始碼,所以至少讀者要熟悉它們的介面使用,同時,對於併發,讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個基本的知識,文中不會對這些知識進行介紹。Java8 用到了紅黑樹,不過本文不會進行展開,感興趣的讀者請自行查詢相關資料。

Java7 HashMap

HashMap 是最簡單的,一來我們非常熟悉,二來就是它不支援併發操作,所以原始碼也非常簡單。

首先,我們用下面這張圖來介紹 HashMap 的結構。

1

這個僅僅是示意圖,因為沒有考慮到陣列要擴容的情況,具體的後面再說。

大方向上,HashMap 裡面是一個數組,然後陣列中每個元素是一個單向連結串列。

上圖中,每個綠色的實體是巢狀類 Entry 的例項,Entry 包含四個屬性:key, value, hash 值和用於單向連結串列的 next。

capacity:當前陣列容量,始終保持 2^n,可以擴容,擴容後陣列大小為當前的 2 倍。

loadFactor:負載因子,預設為 0.75。

threshold:擴容的閾值,等於 capacity * loadFactor

put 過程分析

還是比較簡單的,跟著程式碼走一遍吧。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public V put(K key, V value) {      // 當插入第一個元素的時候,需要先初始化陣列大小      if (table == EMPTY_TABLE) {          inflateTable(threshold);      }      // 如果 key 為 null,感興趣的可以往裡看,最終會將這個 entry 放到 table[0] 中      if (key == null )          return putForNullKey(value);      // 1. 求 key 的 hash 值      int hash = hash(key);      // 2. 找到對應的陣列下標      int i = indexFor(hash, table.length);      // 3. 遍歷一下對應下標處的連結串列,看是否有重複的 key 已經存在,      //    如果有,直接覆蓋,put 方法返回舊值就結束了      for (Entry<K,V> e = table[i]; e != null ; e = e.next) {          Object k;          if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {              V oldValue = e.value;              e.value = value;              e.recordAccess( this );              return oldValue;          }      }        modCount++;      // 4. 不存在重複的 key,將此 entry 新增到連結串列中,細節後面說      addEntry(hash, key, value, i);      return null ; }

陣列初始化

在第一個元素插入 HashMap 的時候做一次陣列的初始化,就是先確定初始的陣列大小,並計算陣列擴容的閾值。

1 2 3 4 5 6 7 8 9 10 private void inflateTable( int toSize) {      // 保證陣列大小一定是 2 的 n 次方。      // 比如這樣初始化:new HashMap(20),那麼處理成初始陣列大小是 32      int capacity = roundUpToPowerOf2(toSize);      // 計算擴容閾值:capacity * loadFactor      threshold = ( int ) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1 );      // 算是初始化陣列吧      table = new Entry[capacity];      initHashSeedAsNeeded(capacity); //ignore }

這裡有一個將陣列大小保持為 2 的 n 次方的做法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相應的要求,只不過實現的程式碼稍微有些不同,後面再看到的時候就知道了。

計算具體陣列位置

這個簡單,我們自己也能 YY 一個:使用 key 的 hash 值對陣列長度進行取模就可以了。

1 2 3 4 static int indexFor( int hash, int length) {      // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";      return hash & (length- 1 ); }

這個方法很簡單,簡單說就是取 hash 值的低 n 位。如在陣列長度為 32 的時候,其實取的就是 key 的 hash 值的低 5 位,作為它在陣列中的下標位置。

新增節點到連結串列中

找到陣列下標後,會先進行 key 判重,如果沒有重複,就準備將新值放入到連結串列的表頭。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void addEntry( int hash, K key, V value, int bucketIndex) {      // 如果當前 HashMap 大小已經達到了閾值,並且新值要插入的陣列位置已經有元素了,那麼要擴容      if ((size >= threshold) && ( null != table[bucketIndex])) {          // 擴容,後面會介紹一下          resize( 2 * table.length);          // 擴容以後,重新計算 hash 值          hash = ( null != key) ? hash(key) : 0 ;          // 重新計算擴容後的新的下標          bucketIndex = indexFor(hash, table.length);      }      // 往下看      createEntry(hash, key, value, bucketIndex); } // 這個很簡單,其實就是將新值放到連結串列的表頭,然後 size++ void createEntry( int hash, K key, V value, int bucketIndex) {      Entry<K,V> e = table[bucketIndex];      table[bucketIndex] = new Entry<>(hash, key, value, e);      size++; }

這個方法的主要邏輯就是先判斷是否需要擴容,需要的話先擴容,然後再將這個新的資料插入到擴容後的陣列的相應位置處的連結串列的表頭。

陣列擴容

前面我們看到,在插入新值的時候,如果當前的 size 已經達到了閾值,並且要插入的陣列位置上已經有元素,那麼就會觸發擴容,擴容後,陣列大小為原來的 2 倍。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 void resize( int newCapacity) {      Entry[] oldTable = table;      int oldCapacity = oldTable.length;      if (oldCapacity == MAXIMUM_CAPACITY) {          threshold = Integer.MAX_VALUE;          return ;      }      // 新的陣列      Entry[] newTable = new Entry[newCapacity];      // 將原來陣列中的值遷移到新的更大的陣列中      transfer(newTable, initHashSeedAsNeeded(newCapacity));      table = newTable;      threshold = ( int )Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1 ); }

擴容就是用一個新的大陣列替換原來的小陣列,並將原來陣列中的值遷移到新的陣列中。

由於是雙倍擴容,遷移過程中,會將原來 table[i] 中的連結串列的所有節點,分拆到新的陣列的 newTable[i] 和 newTable[i + oldLength] 位置上。如原來陣列長度是 16,那麼擴容後,原來 table[0] 處的連結串列中的所有元素會被分配到新陣列中 newTable[0] 和 newTable[16] 這兩個位置。程式碼比較簡單,這裡就不展開了。

get 過程分析

相對於 put 過程,get 過程是非常簡單的。

  1. 根據 key 計算 hash 值。
  2. 找到相應的陣列下標:hash & (length – 1)。
  3. 遍歷該陣列位置處的連結串列,直到找到相等(==或equals)的 key。
1 2 3 4 5 6 7 8 9 public V get(Object key) {      // 之前說過,key 為 null 的話,會被放到 table[0],所以只要遍歷下 table[0] 處的連結串列就可以了      if (key == null )          return getForNullKey();      //      Entry<K,V> entry = getEntry(key);        return null == entry ? null : entry.getValue(); }

getEntry(key):

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final Entry<K,V> getEntry(Object key) {      if (size == 0 ) {          return null ;      }        int hash = (key == null ) ? 0 : hash(key);      // 確定陣列下標,然後從頭開始遍歷連結串列,直到找到為止      for (Entry<K,V> e = table[indexFor(hash, table.length)];           e != null ;           e = e.next) {          Object k;          if (e.hash == hash &&              ((k = e.key) == key || (key != null && key.equals(k))))              return e;      }      return null ; }

Java7 ConcurrentHashMap

ConcurrentHashMap 和 HashMap 思路是差不多的,但是因為它支援併發操作,所以要複雜一些。

整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會將其描述為分段鎖。注意,行文中,我很多地方用了“槽”來代表一個 segment。

簡單理解就是,ConcurrentHashMap 是一個 Segment 陣列,Segment 通過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是執行緒安全的,也就實現了全域性的執行緒安全。

3

concurrencyLevel:並行級別、併發數、Segment 數,怎麼翻譯不重要,理解它。預設是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支援 16 個執行緒併發寫,只要它們的操作分別分佈在不同的 Segment 上。這個值可以在初始化的時候設定為其他值,但是一旦初始化以後,它是不可以擴容的。

再具體到每個 Segment 內部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證執行緒安全,所以處理起來要麻煩些

初始化

initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。

loadFactor:負載因子,之前我們說了,Segment 陣列不可以擴容,所以這個負載因子是給每個 Segment 內部使用的。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public ConcurrentHashMap( int initialCapacity,                           float loadFactor, int concurrencyLevel) {      if (!(loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel <= 0 )          throw new IllegalArgumentException();      if (concurrencyLevel > MAX_SEGMENTS)          concurrencyLevel = MAX_SEGMENTS;      // Find power-of-two sizes best matching arguments      int sshift = 0 ;      int ssize = 1 ;      // 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方      while (ssize < concurrencyLevel) {          ++sshift;          ssize <<= 1 ;      }      // 我們這裡先不要那麼燒腦,用預設值,concurrencyLevel 為 16,sshift 為 4      // 那麼計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值      this .segmentShift = 32 - sshift;      this .segmentMask = ssize - 1 ;        if (initialCapacity > MAXIMUM_CAPACITY)          initialCapacity = MAXIMUM_CAPACITY;        // initialCapacity 是設定整個 map 初始的大小,      // 這裡根據 initialCapacity 計算 Segment 陣列中每個位置可以分到的大小      // 如 initialCapacity 為 64,那麼每個 Segment 或稱之為"槽"可以分到 4 個      int c = initialCapacity / ssize;      if (c * ssize < initialCapacity)          ++c;      // 預設 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上,      // 插入一個元素不至於擴容,插入第二個的時候才會擴容      int cap = MIN_SEGMENT_TABLE_CAPACITY;      while (cap < c)          cap <<= 1 ;        // 建立 Segment 陣列,      // 並建立陣列的第一個元素 segment[0]      Segment<K,V> s0 =          new Segment<K,V>(loadFactor, ( int )(cap * loadFactor),                           (HashEntry<K,V>[]) new HashEntry[cap]);      Segment<K,V>[] ss = (Segment<K,V>[]) new Segment[ssize];      // 往陣列寫入 segment[0]      UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]      this .segments = ss; }

初始化完成,我們得到了一個 Segment 陣列。

我們就當是用 new ConcurrentHashMap() 無參建構函式進行初始化的,那麼初始化完成後:

  • Segment 陣列長度為 16,不可以擴容
  • Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
  • 這裡初始化了 segment[0],其他位置還是 null,至於為什麼要初始化 segment[0],後面的程式碼會介紹
  • 當前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到

put 過程分析

我們先看 put 的主流程,對於其中的一些關鍵細節操作,後面會進行詳細介紹。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public V put(K key, V value) {      Segment<K,V> s;      if (value == null )          throw new NullPointerException();      // 1. 計算 key 的 hash 值      int hash = hash(key);      // 2. 根據 hash 值找到 Segment 陣列中的位置 j      //    hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位,      //    然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最後 4 位,也就是槽的陣列下標      int j = (hash >>> segmentShift) & segmentMask;      // 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,      // ensureSegment(j) 對 segment[j] 進行初始化      if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck           (segments, (j << SSHIFT) + SBASE)) == null ) //  in ensureSegment          s = ensureSegment(j);      // 3. 插入新值到 槽 s 中      return s.put(key, hash, value, false ); }

第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作了。

Segment 內部是由 陣列+連結串列 組成的。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47