1. 程式人生 > >ConcurrentHashMap原始碼探究 (JDK 1.8)

ConcurrentHashMap原始碼探究 (JDK 1.8)

很早就知道在多執行緒環境中,HashMap不安全,應該使用ConcurrentHashMap等併發安全的容器代替,對於ConcurrentHashMap也有一定的瞭解,但是由於沒有深入到原始碼層面,很多理解都是浮於表面,稍微深一點的東西就不是很懂。這兩天終於下定決心將ConcurrentHashMap的原始碼探究了一遍,記錄一下心得體會,算是對閱讀原始碼的一個總結吧。需要提醒讀者注意,因為個人水平有限,且本文字質上來講是留給未來的自己進行查閱的總結,所以難免會有錯漏,一經發現,本人會盡快糾正,也歡迎大家提出寶貴的意見。

1.構造器

先從構造器講起。ConcurrentHashMap共有5個構造器,不論是哪個構造器,最後初始化後的容量都是2的整數冪,這些構造器簽名分別如下:

public ConcurrentHashMap();  //a
public ConcurrentHashMap(int initialCapacity);  //b
public ConcurrentHashMap(Map<? extends K, ? extends V> m);  //c
public ConcurrentHashMap(int initialCapacity, float loadFactor);  //d
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel);  //e

構造器a的方法體是空的,像容量、載入因子這些引數都取預設值;構造器b需要一個初始容量作為引數,程式碼如下:

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

首先檢查引數合法性,如果引數initialCapacity超過了最大允許容量(MAXIMUM_CAPACITY = 1 << 30)的一半,則將容量設定為MAXIMUM_CAPACITY,否則使用tableSizeFor方法來計算容量,最後將sizeCtl引數設定為容量的值。關於sizeCtltableSizeFor等將在後文介紹。
構造器c使用一個外部的map進行初始化,sizeCtl設定為預設容量,然後呼叫putAll方法進行容器初始化和複製操作。

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

構造器d在內部之間呼叫構造器e,這兩個構造器唯一不同的是,構造器e額外提供了一個concurrencyLevel引數,構造器d將這個引數設定為1

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

構造器e的邏輯仍然是先檢查引數合法性,concurrencyLevel引數的唯一作用是作為initialCapacity的下限,除此之外別無他用。從這裡可以看出 JDK 1.7 版本的併發度引數DEFAULT_CONCURRENCY_LEVEL已棄用。
構造器be在設定容器初始容量的時候有一點不同,構造器b使用的是initialCapacity + (initialCapacity >>> 1) + 1(即1.5*initialCapacity+1)作為基礎容量,而構造器e使用的是1.0 + (long)initialCapacity / loadFactor(這裡的initialCapacity實際上相當於HashMap中的threshold,當loadFactor = 2/3時兩者是相等的),在使用的時候需要注意這一點。

2.主要欄位

  • sizeCtl:該欄位出鏡率非常高,取值複雜,要讀懂原始碼,該欄位是必須要弄清楚的。概括來說,該欄位控制內部陣列初始化和擴容操作,其取值如下:
    • 負數
      • -1:表示容器正在初始化。
      • -NN-1個執行緒正在執行擴容。
      • 擴容前會被修改成 (resizeStamp(tab.length) << RESIZE_STAMP_SHIFT) + 2,並且每增加一個擴容執行緒,sizeCtl的值加1,擴容執行緒完成對應桶的遷移工作,sizeCtl1,擴容完成後該值再次被設定成擴容閾值。
    • 正數和0
      • 在初始化的時候表示容器初始容量。
      • 初始化之後表示容器下次擴容的閾值(類似於HashMap中的threshold)。
  • RESIZE_STAMP_BITSRESIZE_STAMP_SHIFTsizeCtl中記錄stamp的兩個欄位,這兩個欄位在原始碼中沒有任何位置會修改,它們的值目前都是16(不太明白這個stamp是什麼意思)。
  • 紅黑樹相關欄位:
    • TREEIFY_THRESHOLD = 8:桶由連結串列轉換成紅黑樹結構的閾值,表示桶中的元素個數大於等於8個時將轉換成紅黑樹。
    • UNTREEIFY_THRESHOLD = 6: 桶由紅黑樹轉換成連結串列結構的閾值,表示桶中的元素個數小於等於6個時將轉換成連結串列。
    • MIN_TREEIFY_CAPACITY = 64:啟用紅黑樹的最小元素個數,當集合中的元素個數不足64時,即使某個桶中的元素已經達到8個,也只是執行擴容操作,而不是升級為紅黑樹。
  • 容量相關欄位:
    • MAXIMUM_CAPACITY = 1 << 30:最大容量
    • DEFAULT_CAPACITY = 16:預設容量
    • LOAD_FACTOR:載入因子,預設為0.75f
  • table: 存放容器元素的陣列物件
  • nextTable:平時為null,在擴容時指向擴容後的陣列。
  • baseCount:記錄元素個數,注意該計數器在多執行緒環境下不準,需要配合countCells使用。
  • counterCells:多執行緒環境下,用來暫時存放元素用。(???)
  • transferIndex:表示擴容時將資料從就陣列向新陣列遷移時的下標,多執行緒時根據該欄位給各個執行緒分配各自獨立的遷移區間,以實現多執行緒協作擴容。

3.核心方法

ConcurrentHashMap是用來儲存元素的,最常用的就是一些增刪改查方法,此外,在容量不足時,會自動觸發擴容操作。 接下來將對ConcurrentHashMap中的主要方法進行分析。

  • Unsafe類相關方法
    ConcurrentHashMap廢棄了分段鎖,改用 CAS + Synchronized + valatile保證執行緒安全,而Java主要通過Unsafe類實現CAS,因此原始碼大量使用了Unsafe類的三個CAS方法,如下:
   - compareAndSwapObject(Object o, long offset, Object expected, Object x);
   - compareAndSwapInt(Object o, long offset, int expected, int x);
   - compareAndSwapLong(Object o, long offset, long expected, long x);

這些方法非常相似,區別只是引數expectedx的型別。它們表達的意思是,如果物件ooffset位置的值是expected,則把值修改為x,否則不修改。其中o是給定的物件,offset表示物件記憶體偏移量,expected表示當前位置的期望值,x表示修改後的新值。
此外,ConcurrentHashMap封裝了三個陣列元素訪問方法,底層依然是呼叫Unsafe類:

    //從主存獲取tab[i],避免讀到髒資料
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    //將tab[i]的值從c改成v
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    //將tab[i]的值v寫到主存
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
  • tableSizeFor
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

tableSizeFor的作用是計算第一個大於等於c2的整數冪,ConcurrentHashMap用這個方法計算得到的結果來作為內部陣列的長度。關於這個方法的介紹,網上已經有許多資料,這裡不再贅述,僅僅記錄下原始碼,因為程式碼確實太驚豔了,提醒自己要時常學習原始碼精髓。

  • 獲取元素(get(Object key))的原始碼如下:
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //計算key的hash值
        int h = spread(key.hashCode());
        //陣列長度大於0,且對應的桶不為空,其中(n-1) & h是計算雜湊值h對應的陣列位置
        if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
            //如果頭結點就是目標節點,則返回該節點的值
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //在紅黑樹或者nextTable中進行查詢
            //如果桶中的結構是紅黑樹,那麼root節點的hash值是-2,如果容器正在擴容,會把ForwardingNode節點放在桶裡作佔位符,這種型別的節點hash值為-1
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //走到這裡說明table裡都是正常的連結串列節點,按順序查詢即可
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get方法先進行一系列驗證,之後先判斷頭節點是不是key對應的節點,是就返回,否則檢查頭節點的hash值是不是負數,是負數的話就去紅黑樹或者擴容後的nextTable查詢,不是負數則說明key對應的桶裡是連結串列結構,則按順序查詢。其中方法spread方法用於計算keyhash值,其程式碼如下:

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

h ^ (h >>> 16) 是將h右移16位之後,再與h進行亦或,結果中高16位保持不變,低16位是儲存的是原來高16位和低16位的亦或結果。HASH_BITS的值是0x7fffffff,即2^31-1,只在spread方法裡用到,(h ^ (h >>> 16)) & HASH_BITS的結果相當於只是去掉了h ^ (h >>> 16)的符號位,後面在計算下標時再次與陣列長度作按位與操作,完整的下標計算相當於((h ^ (h >>> 16)) & HASH_BITS) & (n-1),由於n32位整數,其最大值也不會大於HASH_BITS,所以((h ^ (h >>> 16)) & HASH_BITS) & (n-1)的效果和(h ^ (h >>> 16)) & (n-1)一樣,似乎這裡並不需要HASH_BITS,和HashMap中的hash()方法保持一致不就行了?這裡留個疑問待以後解答。

  • 新增/修改元素
    put方法底層呼叫的putVal,原始碼如下:
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    //onlyIfAbsent表示key不存在才插入,存在則不更新
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //計算hash
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果陣列還沒初始化,則先初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //如果key對應的桶是空的,並且通過原子操作成功的將新節點插入桶中,則本次插入結束,轉入後續的addCount操作
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //如果當前有執行緒正在轉移資料,則幫助其轉移
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    //這裡再次判斷頭結點有沒有發生變化,因為從上次賦值到加鎖期間,很可能有其他執行緒對tab[i]這個桶進行了操作
                    if (tabAt(tab, i) == f) {
                        //fh>=0表示桶內結構是連結串列
                        if (fh >= 0) {
                            //桶內元素計數器
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //找到key,則更新對應的值
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //key不存在,則將新節點插入連結串列結尾
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //桶內結構是紅黑樹
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            //紅黑樹的binCount直接賦值為2,個人理解是root節點只佔位置,不儲存資料?留個疑問後面來解答。
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //桶內元素大於等於8,則轉換成紅黑樹
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //修改計數器的值,成功新增新元素才會走到這裡
        addCount(1L, binCount);
        return null;
    }

如果陣列沒有初始化,需要先執行initTable()進行初始化,從這裡可以看到,ConcurrentHashMap採用延遲初始化的策略,第一次新增元素的時候才對內部陣列進行初始化。initTable()原始碼如下:

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //sizeCtl < 0 表示當前有其他執行緒正在執行初始化,呼叫Thread.yield()方法讓當前執行緒讓出CPU時間片,保證了只能有一個執行緒對陣列進行初始化
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //如果當前執行緒成功將sizeCtl的值從sc更新為-1,則由當前執行緒執行初始化操作,從這裡可以看出sizeCtl=-1表示當前正在執行初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        //計算陣列容量
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //相當於sc = 0.75*n,即陣列長度*0.75,類似於HashMap中的threshold
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //初始化完成後,將sizeCtl的值更新成擴容閾值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
  • helpTransfer
    putVal方法中,一個比較有意思的地方在於,如果當前執行緒發現有其他執行緒正在進行資料轉移工作(即在擴容中),就幫助轉移資料,該方法原始碼如下:
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        //先確認已初始化,再確認f是ForwardingNode型別節點,即當前確實有執行緒在執行擴容和遷移資料的操作,最後確認擴容後的新陣列是否已初始化完畢
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            //因為存在多執行緒擴容的情況,每次都需要重新判斷擴容條件是否還滿足
            //如果擴容已完成,那麼table=nextTable, nextTable=null,並且sizeCtl變成下次擴容的閾值,下面的三項檢查分別與這裡的情況對應
            while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                //如果成功將sizeCtl的值加1,則進入transfer執行資料遷移工作,從這裡也可以看出,每當有新執行緒協助擴容時,會將sizeCtl的值加1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

helpTransfer裡面最複雜的就是while迴圈體內的第一個條件判斷語句,接下來將一一進行分析。判斷條件裡頻繁出現rs這個變數,有必要先分析resizeStam方法,其原始碼非常簡單,只有一行:

    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

Integer.numberOfLeadingZeros(n)是計算當前陣列長度n的二進位制數表示中,最左邊的1之前有多少個0RESIZE_STAMP_BITS=16,假設n=16,其二進位制最左邊有270,那麼resizeStamp返回的結果的二進位制就是1000000000011011
瞭解了resizeStamp,接著再回到helpTransfer方法,從前面對sizeCtl的講解知道,在擴容前sizeCtl會被設定為sc = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2,這是一個負數,那麼將sc無符號右移16位剛好得到resizeStamp(n),即rs變數的值。原始碼中先做(sc >>> RESIZE_STAMP_SHIFT) != rs的判斷,保證rssc這兩個對應的陣列長度相同,即當前擴容還沒結束。sc == rs + 1 || sc == rs + MAX_RESIZERS 這兩個判斷條件還沒看太懂,大概是限制擴容的執行緒不要超過最大允許執行緒數,但是具體為什麼不是太清楚,有待後續查證。transferIndex是資料遷移的位置變數,從後往前開始遷移資料,當transferIndex <= 0時,說明遷移已經結束了。

  • transfer
    ConcurrentHashMap的擴容實際上就是新建一個兩倍大的陣列,然後將老資料遷移到新陣列的過程,這也是transfer的字面意思。資料遷移的真正過程都在transfer方法裡,該方法總共一百多行,下面將結合原始碼一點點進行解析。
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //stride相當於一個步長變數,每個執行緒需要承擔stride個桶資料的遷移工作,這裡初始化的過程會參照機器CPU數量,在多CPU上stride=n/(8*NCPU),單CPU上stride=n,但是stride不能小於16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        //如果新陣列還沒初始化,在這裡進行初始化
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                //新建兩倍長度的陣列
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //nextTab欄位在這裡才不為null,而是指向新陣列
            nextTable = nextTab;
            //資料遷移的下標,從後往前遷移
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //是否可以開始遷移下一個桶的識別符號,當前桶遷移完畢才可以接著遷移前一個桶的資料
        boolean advance = true;
        //記錄當前遷移工作是否結束
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //如果當前桶已遷移完畢,開始處理下一個桶
            while (advance) {
                int nextIndex, nextBound;
                //這裡遷移的桶區間是[bound,i-1],因此--i>=bound說明當前執行緒的遷移任務還沒結束,需要跳出while迴圈,執行後面的遷移工作
                if (--i >= bound || finishing)
                    advance = false;
                //如果所有的桶都已經分給了相應的執行緒進行處理,沒有多餘的桶給當前執行緒了,將i設定為-1,讓當前執行緒退出遷移操作
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //如果當前執行緒成功分到[nextIndex-stride,nextIndex-1]的區間,則可以開始進行資料遷移了,在還有多餘的桶沒有分配時,新加入進來的擴容執行緒會先執行到這裡領取任務,下一個執行緒進來將會接著從nextIndex-stride往前遷移stride個桶的資料
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //當出現下面這些情況時,說明擴容已結束,需要做一些善後工作
            //i<0的情況在上面的while迴圈裡出現過,但是i >= n 和 i + n >= nextn這兩個條件會在什麼情況下出現呢???
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //如果所有的資料遷移都已完成,則執行這裡的邏輯退出
                if (finishing) {
                    //nextTable重新賦值為null
                    nextTable = null;
                    //table指向新陣列,原來的陣列將被GC回收
                    table = nextTab;
                    //sizeCtl = 2*n-0.5*n=1.5*n,這個值實際上就是新陣列長度的0.75倍,即下一次擴容的閾值。這裡的做法很巧妙,n是原陣列的長度,擴容後的長度是2*n,按照0.75的載入因子來算,新陣列的擴容閾值就是2*n*0.75=1.5*n,但位運算顯然更快一些
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //如果當前執行緒的遷移工作已完成,就將sizeCtl的值減1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //如果sizeCtl的值變成了擴容前的值(即resizeStamp(n) << RESIZE_STAMP_SHIFT + 2),說明擴容完成
                    //問題:上面的原子操作是現將修改前的sizeCtl賦值給sc,然後才將sizeCtl減1,那麼sc應該永遠取不到resizeStamp(n) << RESIZE_STAMP_SHIFT + 2才對,下面的return語句不會執行,這裡是不是哪裡理解得不對???留待以後驗證。
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //走到這裡說明所有的遷移工作都完成了,設定相關欄位
                    finishing = advance = true;
                    //將i設定為n,那麼執行緒將會接著執行最外層for迴圈,從後向前逐個檢查是否每個桶都已完成資料遷移
                    i = n; // recheck before commit
                }
            }
            //如果原來的桶裡沒有資料,插入一個ForwardingNode作佔位符,告訴其他執行緒當前正在進行擴容
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //這個判斷緊接著上一個判斷,如果已經有佔位符了,說明有執行緒已經處理過這個桶了,不能再處理這個桶
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            //當前桶既不是佔位符也不是空(即桶裡面是連結串列或紅黑樹),會走到這裡
            else {
                //遷移桶內資料需要加鎖,避免其他執行緒同時增加、刪除或修改桶裡的資料
                synchronized (f) {
                    //從上一次取桶的頭節點到加鎖之前,可能該桶已經被其他執行緒處理過,需要進行二次判斷
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //桶裡是連結串列結構
                        if (fh >= 0) {
                            //這裡值得注意,回憶一下上面對put方法的分析,插入元素時,元素的hash值是根據spread(key.hashCode())進行計算的,hash值在擴容期間肯定不會變,變化的僅僅是陣列的長度,而陣列的長度在擴容後會左移1位,因此元素在新陣列中的位置就由hash值從低位開始的第n位決定,如果hash值第n位是0,則元素還在下標為i的這個桶裡,否則元素的新位置是i+n。這裡的處理邏輯與HashMap中一樣,都是為了降低計算新位置的開銷。
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //這個for迴圈是為了避免新建不必要的節點,即經過計算髮現,如果當前連結串列的某個元素a及其後面的所有節點都在同一個桶內,那麼在進行資料轉移時,只需要處理a節點之前的節點即可,a節點及其之後的節點仍然維持原來的連結串列結構放在新陣列的對應桶裡,這裡的lastRun就是待確定的a節點
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            //如果最後幾個節點都呆在原來的桶裡,則設定ln指向lastRun節點,否則將hn指向lastRun節點
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //這裡才是真正的資料轉移過程,跟前面的分析一樣,從頭結點處理到lastRun節點即可。
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //以下的邏輯表明,資料遷移是採用在連結串列頭部插入資料的做法,lastRun之前的節點順序會被反轉
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //ln對應老的桶下標i,hn對應新的桶下標i+n
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            //在元素遷移完之後,會將桶的頭結點設定為ForwardingNode佔位節點
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //桶內是紅黑樹結構,邏輯與連結串列大同小異,不再贅述
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

終於分析完transfer的邏輯了~~~現在來整理一下需要注意的地方。首先需要注意的是ForwardingNode節點則這段程式碼裡有三處地方會用到,一是在最外層的for迴圈的邏輯中,當發現桶是null時,會將ForwardingNode節點插入,實際上這種情況也可以視為這個桶資料已遷移完成,另外兩處都是在遷移完資料之後,將ForwardingNode插入作佔位符,因此當遍歷元素遇到佔位符時,都表明當前正在擴容,且這個桶裡的資料已經遷移完了;二是資料遷移的過程中,連結串列首部的節點會變成倒序。
分析到這裡,put->initTable->helpTransfer->transfer這條線已經分析完了,put方法最後還遺留了一個addCount方法沒有分析,這部分內容跟容器計數有關,先看看addCount的原始碼:

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //這部分跟計數有關
        //滿足if判斷的條件:①counterCells!=null ② counterCells=null,且併發修改baseCount的值失敗,說明當前有其他執行緒也在修改baseCount
        if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //這個判斷是什麼意思目前不太清楚???留給後面確認並補充。
            if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            //counterCells!=null,並且counterCells中有資料,並且下標a對應的元素不是null,且成功的將CELLVALUE從a.value修改成a.value+x,才會執行到這裡
            if (check <= 1)
                return;
            //計算元素個數
            s = sumCount();
        }
        //這部分跟擴容有關
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //s>=sizeCtl表示達到擴容閾值,需要進行擴容,條件是陣列已經初始化並且沒有達到最大容量
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                //sc < 0 表示擴容已開始
                if (sc < 0) {
                    //這裡的判斷跟helpTransfer方法一樣,不再贅述
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //當前執行緒加入擴容工作
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //走到這裡說明擴容工作還沒開始,由當前執行緒開始擴容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

這段程式碼裡出現了counterCells欄位,但是到目前為止,並沒有看到在哪裡對這個欄位進行了賦值,全域性搜尋發現,該欄位只在fullAddCount方法裡被賦值,而fullAddCount方法也出現在addCount中,因此接下來將分析一下fullAddCount方法。

    //See LongAdder version for explanation
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        //如果ThreadLocalRandom還沒有初始化,這裡先進行初始化
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        //這裡會自旋
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            //如果counterCells已經初始化
            if ((as = counterCells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        //這裡的原子操作限制了每次只能有一個執行緒執行到此處
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                //如果counterCells在h對應的索引位置還沒初始化,則初始化
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //如果counterCells還沒初始化
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        //counterCells要求長度必須是2的整數冪,因此先初始化為2
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

從原始碼的註釋可以看到,fullAddCount借鑑了LongAdder的思想,因此原始碼並沒有在這裡給出詳細解釋。本人對這塊也是一知半解,因此上面僅僅是把理解的內容進行了分析,而對於addCountfullAddCount的實現原理並沒有透徹的理解,這部分後面需要補充,此處就不再進行額外的解讀,以免誤人子弟。

  • tryPresize
    跟擴容有關的方法還有tryPresize,這個方法在ConcurrentHashMap中只有兩個地方呼叫,一個是putAll方法,另一個是treeifyBin。原始碼如下:
    private final void tryPresize(int size) {
        //計算擴容後的陣列長度
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            //如果陣列還沒有初始化
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                //將sizeCtl設定為-1,開始執行初始化操作
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            //如果當前元素個數沒有達到擴容閾值(c<=sc),或者陣列長度已經到最大值了,不需要擴容
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                //擴容已開始
                if (sc < 0) {
                    Node<K,V>[] nt;
                    //如果擴容已結束或者擴容執行緒已達到最大,當前執行緒什麼也不幹
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //當前執行緒加入擴容工作
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //擴容還沒開始,就由當前執行緒開始擴容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }
  • 統計容器內元素個數
    ConcurrentHashMap中,統計元素個數應該使用sumCount()而不是size()方法,原因在於:方法返回的是int型別,而實際上concurrentHashMap儲存的元素可以超過這個範圍,sumCount方法的返回值為long型別可以說明這一點,其原始碼如下:
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        //元素數量是baseCount和counterCells中儲存的元素數量之和
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

4.問題整理

在閱讀程式碼的過程中,仍然有些地方的邏輯不是很懂,在這裡將這些問題記錄下來,方便查閱和後期補充。

  • counterCells欄位是用來幹什麼的?
  • spread方法裡為何要和HASH_BITS作按位與計算?
  • putVal方法中,當發現桶中的資料結構是紅黑樹時,binCount為何直接賦值為2
  • transfer方法中,for迴圈體重的if條件語句中,什麼情況下會滿足i >= ni + n >= nextn這兩個條件?
  • transfer方法中,下面的if語句似乎永遠為false,怎麼理解?
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
  • addCountfullAddCount方法的原理還不是太理解,需要再研究研究。
  • RESIZE_STAMP_BITSRESIZE_STAMP_SHIFT這兩個欄位的作用不太瞭解,以及sizeCtl裡面是哪部分在記錄stamp
  • ⑦原始碼中在修改sizeCtl的值時,有時候直接使用sizeCtl,有時候又使用類似於U.compareAndSwapInt(this, SIZECTL, sc, -1)這種,兩者有何區別?
  • ⑧原始碼中多次出現if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)這種判斷,其中的sc == rs + 1 || sc == rs + MAX_RESIZERS該如何理解?

5.總結

讀原始碼的過程是痛苦的,尤其是剛開始閱讀的時候,有許許多多東西都看不懂,以至於根本不知道從哪裡看起,然而一旦能夠看下去了,會發現其實原始碼並不是太可怕,遇到的困難都是可以克服的。經過JDK 1.8之後,ConcurrentHashMap的原始碼的行數達到了6000+,本文只是記錄了ConcurrentHashMap非常有限的內容,還有許多內容並未觸碰,因此可以說本人對於ConcurrentHashMap的瞭解也非常有限。最開始讀原始碼時,並沒有想過要寫一篇部落格來記載,但是很快發現,如果不記下來,過不了多久就會把已經看過的內容忘個七七八八,因此才動了寫篇讀後感的念頭。當然,在寫作的過程中,自己的思路也捋順了許多,也算是個額外的收穫。

6.參考文獻

從開始看原始碼到寫完本文,其實看了不少前人的優秀文章,這裡先佔個坑位,後面會把相關參考資料整理到這裡