1. 程式人生 > >java基礎系列之ConcurrentHashMap源碼分析(基於jdk1.8)

java基礎系列之ConcurrentHashMap源碼分析(基於jdk1.8)

threshold 主存 類比 tile num method 過程 參數 nsf

1、前提

  在閱讀這篇博客之前,希望你對HashMap已經是有所理解的;另外你對java的cas操作也是有一定了解的,因為在這個類中大量使用到了cas相關的操作來保證線程安全的。

  2、概述

  ConcurrentHashMap這個類在java.lang.current包中,這個包中的類都是線程安全的。ConcurrentHashMap底層存儲數據的結構與1.8的HashMap是一樣的,都是數組+鏈表(或紅黑樹)的結構。在日常的開發中,我們最長用到的鍵值對存儲結構的是HashMap,但是我們知道,這個類是非線程安全的,在高並發的場景下,在進行put操作的時候有可能進入死循環從而使服務器的cpu使用率達到100%;sun公司因此也給出了與之對應的線程安全的類。在jdk1.5以前,使用的是HashTable,這個類為了保證線程安全,在每個類中都添加了synchronized關鍵字,而想而知在高並發的情景下相率是非常低下的。為了解決HashTable效率低下的問題,官網在jdk1.5後推出了ConcurrentHashMap來替代飽受詬病的HashTable。jdk1.5後ConcurrentHashMap使用了分段鎖的技術。在整個數組中被分為多個segment,每次get,put,remove操作時就鎖住目標元素所在的segment中,因此segment與segment之前是可以並發操作的,上述就是jdk1.5後實現線程安全的大致思想。但是,從描述中可以看出一個問題,就是如果出現比較機端的情況,所有的數據都集中在一個segment中的話,在並發的情況下相當於鎖住了全表,這種情況下其實是和HashTable的效率出不多的,但總體來說相較於HashTable,效率還是有了很大的提升。jdk1.8後,ConcurrentHashMap摒棄了segment的思想,轉而使用cas+synchronized組合的方式來實現並發下的線程安全的,這種實現方式比1.5的效率又有了比較大的提升。那麽,它是如何整體提升效率的呢?見下文分析吧!

 3、重要成員變量

  1、ziseCtr:在多個方法中出現過這個變量,該變量主要是用來控制數組的初始化和擴容的,默認值為0,可以概括一下4種狀態:

    a、sizeCtr=0:默認值;

    b、sizeCtr=-1:表示Map正在初始化中;

    c、sizeCtr=-N:表示正在有N-1個線程進行擴容操作;

    d、sizeCtr>0: 未初始化則表示初始化Map的大小,已初始化則表示下次進行擴容操作的閾值;

  2、table:用於存儲鏈表或紅黑數的數組,初始值為null,在第一次進行put操作的時候進行初始化,默認值為16;

  3、nextTable:在擴容時新生成的數組,其大小為當前table的2倍,用於存放table轉移過來的值;

  4、Node:該類存儲數據的核心,以key-value形式來存儲;

  5、ForwardingNode:這是一個特殊Node節點,僅在進行擴容時用作占位符,表示當前位置已被移動或者為null,該node節點的hash值為-1;

  4、put操作

  先把源碼擺上來:

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //key和value不能為空
        if (key == null || value == null
) throw new NullPointerException(); //通過key來計算獲得hash值 int hash = spread(key.hashCode()); //用於計算數組位置上存放的node的節點數量 //在put完成後會對這個參數判斷是否需要轉換成紅黑樹或鏈表 int binCount = 0; //使用自旋的方式放入數據 //這個過程是非阻塞的,放入失敗會一直循環嘗試,直至成功 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //第一次put操作,對數組進行初始化,實現懶加載 if (tab == null || (n = tab.length) == 0) //初始化 tab = initTable(); //數組已初始化完成後 //使用cas來獲取插入元素所在的數組的下標的位置,該位置為空的話就直接放進去 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //hash=-1,表明該位置正在進行擴容操作,讓當前線程也幫助該位置上的擴容,並發擴容提高擴容的速度 else if ((fh = f.hash) == MOVED) //幫助擴容 tab = helpTransfer(tab, f); //插入到該位置已有數據的節點上,即用hash沖突 //在這裏為保證線程安全,會對當前數組位置上的第一個節點進行加鎖,因此其他位置上 //仍然可以進行插入,這裏就是jdk1.8相較於之前版本使用segment作為鎖性能要高效的地方 else { V oldVal = null; synchronized (f) { //再一次判斷f節點是否為第一個節點,防止其他線程已修改f節點 if (tabAt(tab, i) == f) { //為鏈表 if (fh >= 0) { binCount = 1; //將節點放入鏈表中 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //為紅黑樹 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; //將節點插入紅黑樹中 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //插入成功後判斷插入數據所在位置上的節點數量, //如果數量達到了轉化紅黑樹的閾值,則進行轉換 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //由鏈表轉換成紅黑樹 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //使用cas統計數量增加1,同時判斷是否滿足擴容需求,進行擴容 addCount(1L, binCount); return null; }

  在代碼上寫註釋可能看得不是很清晰,那麽我就使用文字再來描述一下插入數據的整個流程:

    1、判斷傳進來的key和value是否為空,在ConcurrentHashMap中key和value都不允許為空,然而在HashMap中是可以為key和val都可以為空,這一點值得註意一下;

    2、對key進行重hash計算,獲得hash值;

    3、如果當前的數組為空,說明這是第一插入數據,則會對table進行初始化;

    4、插入數據,這裏分為3中情況:

      1)、插入位置為空,直接將數據放入table的第一個位置中;

      2)、插入位置不為空,並且改為是一個ForwardingNode節點,說明該位置上的鏈表或紅黑樹正在進行擴容,然後讓當前線程加進去並發擴容,提高效率;

      3)、插入位置不為空,也不是ForwardingNode節點,若為鏈表則從第一節點開始組個往下遍歷,如果有key的hashCode相等並且值也相等,那麽就將該節點的數據替換掉,

        否則將數據加入  到鏈表末段;若為紅黑樹,則按紅黑樹的規則放進相應的位置;

    5、數據插入成功後,判斷當前位置上的節點的數量,如果節點數據大於轉換紅黑樹閾值(默認為8),則將鏈表轉換成紅黑樹,提高get操作的速度;

    6、數據量+1,並判斷當前table是否需要擴容;

  所以,put操作流程可以簡單的概括為上面的六個步驟,其中一些具體的操作會在下面進行詳細的說明,不過,值得註意的是:

    1、ConcurrentHashMap不可以存儲key或value為null的數據,有別於HashMap;

    2、ConcurrentHashMap使用了懶加載的方式初始化數據,把table的初始化放在第一次put數據的時候,而不是在new的時候;

    3、擴容時是支持並發擴容,這將有助於減少擴容的時間,因為每次擴容都需要對每個節點進行重hash,從一個table轉移到新的table中,這個過程會耗費大量的時間和cpu資源。

    4、插入數據操作鎖住的是表頭,這是並發效率高於jdk1.7的地方;

  4.1、hash計算的spread方法

/**
     * Spreads (XORs) higher bits of hash to lower and also forces top
     * bit to 0. Because the table uses power-of-two masking, sets of
     * hashes that vary only in bits above the current mask will
     * always collide. (Among known examples are sets of Float keys
     * holding consecutive whole numbers in small tables.)  So we
     * apply a transform that spreads the impact of higher bits
     * downward. There is a tradeoff between speed, utility, and
     * quality of bit-spreading. Because many common sets of hashes
     * are already reasonably distributed (so don‘t benefit from
     * spreading), and because we use trees to handle large sets of
     * collisions in bins, we just XOR some shifted bits in the
     * cheapest possible way to reduce systematic lossage, as well as
     * to incorporate impact of the highest bits that would otherwise
     * never be used in index calculations because of table bounds.
     */
    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

  從源碼中可以看到,jdk1.8計算hash的方法是先獲取到key的hashCode,然後對hashCode進行高16位和低16位異或運算,然後再與 0x7fffffff 進行與運算。高低位異或運算可以保證haahCode的每一位都可以參與運算,從而使運算的結果更加均勻的分布在不同的區域,在計算table位置時可以減少沖突,提高效率,我們知道Map在put操作時大部分性能都耗費在解決hash沖突上面。得出運算結果後再和 0x7fffffff 與運算,其目的是保證每次運算結果都是一個正數。對於java位運算不了解的同學,建議百度自行了解相關內容。

  4.2、java內存模型和cas操作

  這裏我只是簡單的說一下java的內存模型和cas,因為這篇文章的主角的ConcurrentHashMap。

  java內存模型:在java中線程之間的通訊是通過共享內存(即我們在變成時聲明的成員變量或叫全局變量)的來實現的。Java內存模型中規定了所有的變量都存儲在主內存中,每條線程還有自己的工作內存(可以與前面將的處理器的高速緩存類比),線程的工作內存中保存了該線程使用到的變量到主內存副本拷貝,線程對變量的所有操作(讀取、賦值)都必須在工作內存中進行,而不能直接讀寫主內存中的變量。不同線程之間無法直接訪問對方工作內存中的變量,線程間變量值的傳遞均需要在主內存來完成,線程、主內存和工作內存的交互關系如下圖所示,和上圖很類似。技術分享圖片

  舉一個非常簡單的例子,就是我們常用的i++的操作,這個操作看起來只有一行,然而在編譯器中這一行代碼會被編譯成3條指令,分別是讀取、更新和寫入,所以i++並不是一個原子操作,在多線程環境中是有問題了。其原因在於(我們假設當前 i 的值為1)當一條線程向主內存中讀取數據時,還沒來得及把更新後的值刷新到主內存中,另一個線程就已經開始向主內存中讀取了數據,而此時內存中的值仍然為1,兩個線程執行+1操作後得到的結果都為2,然後將結果刷新到主內存中,整個i++操作結果,最終得到的結果為2,但是我們預想的結果應該是3,這就出現了線程安全的問題了。

  cas: cas的全名稱是Compare And Swap 即比較交換。cas算法在不需要加鎖的情況也可以保證多線程安全。核心思想是: cas中有三個變量,要更新的變量V,預期值E和新值N,首先先讀取V的值,然後進行相關的操作,操作完成後再向主存中讀取一次取值為E,當且僅當V == E時才將N賦值給V,否則再走一遍上訴的流程,直至更新成功為止。就拿上面的i++的操作來做說明,假設當前i=1,兩個線程同時對i進行+1的操作,線程A中V = 1,E = 1,N = 2;線程B中 V = 1,E = 1,N = 2;假設線程A先執行完整個操作,此時線程A發現 V = E = 1,所以線程A將N的值賦值給V,那麽此時i的值就變成了 2 ;線程B隨後也完成了操作,向主存中讀取i的值,此時E = 2,V = 1,V != E,發現兩個並不相等,說明i已經被其他線程修改了,因此不執行更新操作,而是從新讀取V的值V = 2 ,執行+1後N = 3,完成後再讀取主存中i的值,因為此時沒有其他線程修改i的值了,所以E = 2,V = E = 2,兩個值相等,因此執行賦值操作,將N的值賦值給i,最終得到的結果為3。在整過過程中始終沒有使用到鎖,卻實現的線程的安全性。

  從上面的過程知道,cas會面臨著兩個問題,一個是當線程一直更新不成功的話,那麽這個線程就一直處於死循環中,這樣會非常耗費cpu的資源;另一種是ABA的問題,即對i =1進行+1操作後,再-1,那麽此時i的值仍為1,而另外一個線程獲取的E的值也是1,認為其他線程沒有修改過i,然後進行的更新操作,事實上已經有其他線程修改過了這個值了,這個就是 A ---> B ---> A 的問題;

  4.3、獲取table對應的索引元素的位置

  通過(n-1)& hash 的算法來獲得對應的table的下標的位置。

  tabAt(Node<K,V>[] tab, int i): 這個方法使用了java提供的原子操作的類來操作的,sun.misc.Unsafe.getObjectVolatile 的方法來保證每次線程都能獲取到最新的值;

  casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v): 這個方法是通過cas的方式來獲取i位置的元素;

  4.4、擴容

  - 如果新增節點之後,所在的鏈表的元素個數大於等於8,則會調用treeifyBin把鏈表轉換為紅黑樹。在轉換結構時,若tab的長度小於MIN_TREEIFY_CAPACITY,默認值為64,

  則會將數組長度擴大到原來的兩倍,並觸發transfer,重新調整節點位置。(只有當tab.length >= 64, ConcurrentHashMap才會使用紅黑樹。)
  - 新增節點後,addCount統計tab中的節點個數大於閾值(sizeCtl),會觸發transfer,重新調整節點位置。

/**
     * Adds to count, and if table is too small and not already
     * resizing, initiates transfer. If already resizing, helps
     * perform transfer if work is available.  Rechecks occupancy
     * after a transfer to see if another resize is already needed
     * because resizings are lagging additions.
     *
     * @param x the count to add
     * @param check if <0, don‘t check resize, if <= 1 only check if uncontended
     */
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

  5、get操作

  get操作中沒有使用到同步的操作,所以相對來說比較簡單一點。通過key的hashCode計算獲得相應的位置,然後在遍歷該位置上的元素,找到需要的元素,然後返回,如果沒有則返回null:

/**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

java基礎系列之ConcurrentHashMap源碼分析(基於jdk1.8)