1. 程式人生 > >【java基礎】ConcurrentHashMap實現原理及原始碼分析

【java基礎】ConcurrentHashMap實現原理及原始碼分析

  ConcurrentHashMap是Java併發包中提供的一個執行緒安全且高效的HashMap實現(若對HashMap的實現原理還不甚瞭解,可參考我的另一篇文章),ConcurrentHashMap在併發程式設計的場景中使用頻率非常之高,本文就來分析下ConcurrentHashMap的實現原理,並對其實現原理進行分析(JDK1.7).

ConcurrentHashMap實現原理

  眾所周知,雜湊表是中非常高效,複雜度為O(1)的資料結構,在Java開發中,我們最常見到最頻繁使用的就是HashMap和HashTable,但是線上程競爭激烈的併發場景中使用都不夠合理。

  HashMap

 :先說HashMap,HashMap是執行緒不安全的,在併發環境下,可能會形成環狀連結串列(擴容時可能造成,具體原因自行百度google或檢視原始碼分析),導致get操作時,cpu空轉,所以,在併發環境中使用HashMap是非常危險的。

  HashTable : HashTable和HashMap的實現原理幾乎一樣,差別無非是1.HashTable不允許key和value為null;2.HashTable是執行緒安全的。但是HashTable執行緒安全的策略實現代價卻太大了,簡單粗暴,get/put所有相關操作都是synchronized的,這相當於給整個雜湊表加了一把大鎖,多執行緒訪問時候,只要有一個執行緒訪問或操作該物件,那其他執行緒只能阻塞,相當於將所有的操作序列化

,在競爭激烈的併發場景中效能就會非常差。

  HashTable效能差主要是由於所有操作需要競爭同一把鎖,而如果容器中有多把鎖,每一把鎖鎖一段資料,這樣在多執行緒訪問時不同段的資料時,就不會存在鎖競爭了,這樣便可以有效地提高併發效率。這就是ConcurrentHashMap所採用的"分段鎖"思想。

  

ConcurrentHashMap原始碼分析   

ConcurrentHashMap採用了非常精妙的"分段鎖"策略,ConcurrentHashMap的主幹是個Segment陣列。

 final Segment<K,V>[] segments;

  Segment繼承了ReentrantLock,所以它就是一種可重入鎖(ReentrantLock)。在ConcurrentHashMap,一個Segment就是一個子雜湊表,Segment裡維護了一個HashEntry陣列,併發環境下,對於不同Segment的資料進行操作是不用考慮鎖競爭的。(就按預設的ConcurrentLeve為16來講,理論上就允許16個執行緒併發執行,有木有很酷)

  所以,對於同一個Segment的操作才需考慮執行緒同步,不同的Segment則無需考慮。

Segment類似於HashMap,一個Segment維護著一個HashEntry陣列

 transient volatile HashEntry<K,V>[] table;

HashEntry是目前我們提到的最小的邏輯處理單元了。一個ConcurrentHashMap維護一個Segment陣列,一個Segment維護一個HashEntry陣列。

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
        //其他省略
}    

我們說Segment類似雜湊表,那麼一些屬性就跟我們之前提到的HashMap差不離,比如負載因子loadFactor,比如閾值threshold等等,看下Segment的構造方法

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;//負載因子
            this.threshold = threshold;//閾值
            this.table = tab;//主幹陣列即HashEntry陣列
        }

我們來看下ConcurrentHashMap的構造方法

public ConcurrentHashMap(int initialCapacity,
                               float loadFactor, int concurrencyLevel) {
          if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
              throw new IllegalArgumentException();
          //MAX_SEGMENTS 為1<<16=65536,也就是最大併發數為65536
          if (concurrencyLevel > MAX_SEGMENTS)
              concurrencyLevel = MAX_SEGMENTS;
          //2的sshif次方等於ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
         int sshift = 0;
         //ssize 為segments陣列長度,根據concurrentLevel計算得出
         int ssize = 1;
         while (ssize < concurrencyLevel) {
             ++sshift;
             ssize <<= 1;
         }
         //segmentShift和segmentMask這兩個變數在定位segment時會用到,後面會詳細講
         this.segmentShift = 32 - sshift;
         this.segmentMask = ssize - 1;
         if (initialCapacity > MAXIMUM_CAPACITY)
             initialCapacity = MAXIMUM_CAPACITY;
         //計算cap的大小,即Segment中HashEntry的陣列長度,cap也一定為2的n次方.
         int c = initialCapacity / ssize;
         if (c * ssize < initialCapacity)
             ++c;
         int cap = MIN_SEGMENT_TABLE_CAPACITY;
         while (cap < c)
             cap <<= 1;
         //建立segments陣列並初始化第一個Segment,其餘的Segment延遲初始化
         Segment<K,V> s0 =
             new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                              (HashEntry<K,V>[])new HashEntry[cap]);
         Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
         UNSAFE.putOrderedObject(ss, SBASE, s0); 
         this.segments = ss;
     }

  初始化方法有三個引數,如果使用者不指定則會使用預設值,initialCapacity為16,loadFactor為0.75(負載因子,擴容時需要參考),concurrentLevel為16。

  從上面的程式碼可以看出來,Segment陣列的大小ssize是由concurrentLevel來決定的,但是卻不一定等於concurrentLevel,ssize一定是大於或等於concurrentLevel的最小的2的次冪。比如:預設情況下concurrentLevel是16,則ssize為16;若concurrentLevel為14,ssize為16;若concurrentLevel為17,則ssize為32。為什麼Segment的陣列大小一定是2的次冪?其實主要是便於通過按位與的雜湊演算法來定位Segment的index。至於更詳細的原因,有興趣的話可以參考我的另一篇文章《HashMap實現原理及原始碼分析》,其中對於陣列長度為什麼一定要是2的次冪有較為詳細的分析。

接下來,我們來看看put方法

public V put(K key, V value) {
        Segment<K,V> s;
        //concurrentHashMap不允許key/value為空
        if (value == null)
            throw new NullPointerException();
        //hash函式對key的hashCode重新雜湊,避免差勁的不合理的hashcode,保證雜湊均勻
        int hash = hash(key);
        //返回的hash值無符號右移segmentShift位與段掩碼進行位運算,定位segment
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

 從原始碼看出,put的主要邏輯也就兩步:1.定位segment並確保定位的Segment已初始化 2.呼叫Segment的put方法。

 關於segmentShift和segmentMask

 segmentShift和segmentMask這兩個全域性變數的主要作用是用來定位Segment,int j =(hash >>> segmentShift) & segmentMask。

  segmentMask:段掩碼,假如segments陣列長度為16,則段掩碼為16-1=15;segments長度為32,段掩碼為32-1=31。這樣得到的所有bit位都為1,可以更好地保證雜湊的均勻性

  segmentShift:2的sshift次方等於ssize,segmentShift=32-sshift。若segments長度為16,segmentShift=32-4=28;若segments長度為32,segmentShift=32-5=27。而計算得出的hash值最大為32位,無符號右移segmentShift,則意味著只保留高几位(其餘位是沒用的),然後與段掩碼segmentMask位運算來定位Segment。

  get/put方法

get方法

 public V get(Object key) {
        Segment<K,V> s; 
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        //先定位Segment,再定位HashEntry
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

get方法無需加鎖,由於其中涉及到的共享變數都使用volatile修飾,volatile可以保證記憶體可見性,所以不會讀取到過期資料。

  來看下concurrentHashMap代理到Segment上的put方法,Segment中的put方法是要加鎖的。只不過是鎖粒度細了而已。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);//tryLock不成功時會遍歷定位到的HashEnry位置的連結串列(遍歷主要是為了使CPU快取連結串列),若找不到,則建立HashEntry。tryLock一定次數後(MAX_SCAN_RETRIES變數決定),則lock。若遍歷過程中,由於其他執行緒的操作導致連結串列頭結點變化,則需要重新遍歷。
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;//定位HashEntry,可以看到,這個hash值在定位Segment時和在Segment中定位HashEntry都會用到,只不過定位Segment時只用到高几位。
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
              //若c超出閾值threshold,需要擴容並rehash。擴容後的容量是當前容量的2倍。這樣可以最大程度避免之前雜湊好的entry重新雜湊,具體在另一篇文章中有詳細分析,不贅述。擴容並rehash的這個過程是比較消耗資源的。
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

 總結

  ConcurrentHashMap作為一種執行緒安全且高效的雜湊表的解決方案,尤其其中的"分段鎖"的方案,相比HashTable的全表鎖在效能上的提升非常之大。本文對ConcurrentHashMap的實現原理進行了詳細分析,並解讀了部分原始碼,希望能幫助到有需要的童鞋。