1. 程式人生 > >為並發而生的 ConcurrentHashMap(Java 8)

為並發而生的 ConcurrentHashMap(Java 8)

內部 cti 下一個 count() java 8 插入 thread nis 構造函數

HashMap 是我們日常最常見的一種容器,它以鍵值對的形式完成對數據的存儲,但眾所周知,它在高並發的情境下是不安全的。尤其是在 jdk 1.8 之前,rehash 的過程中采用頭插法轉移結點,高並發下,多個線程同時操作一條鏈表將直接導致閉鏈,死循環並占滿 CPU。

當然,jdk 1.8 以來,對 HashMap 的內部進行了很大的改進,采用數組+鏈表+紅黑樹來進行數據的存儲。rehash 的過程也進行了改動,基於復制的算法思想,不直接操作原鏈,而是定義了兩條鏈表分別完成對原鏈的結點分離操作,即使是多線程的情況下也是安全的。當然,它畢竟不是並發容器,除非大改,否則依然是不能應對高並發場景的,或者說即使沒有因多線程訪問而造成什麽問題,但是效率必然是受到影響的。例如在多線程同時添加元素的時候可能會丟失數據,叠代 map 的時候發生 fail-fast 等。

本篇文章將要介紹的 ConcurrentHashMap 是 HashMap 的並發版本,它是線程安全的,並且在高並發的情境下,性能優於 HashMap 很多。我們主要從以下幾個方面對其進行學習:

  • 歷史版本的實現演變
  • 重要成員屬性的介紹
  • put 方法實現並發添加
  • remove 方法實現並發刪除
  • 其他的一些方法的簡單介紹

一、歷史版本的實現演變

jdk 1.7 采用分段鎖技術,整個 Hash 表被分成多個段,每個段中會對應一個 Segment 段鎖,段與段之間可以並發訪問,但是多線程想要操作同一個段是需要獲取鎖的。所有的 put,get,remove 等方法都是根據鍵的 hash 值對應到相應的段中,然後嘗試獲取鎖進行訪問。

技術分享圖片

jdk 1.8 取消了基於 Segment 的分段鎖思想,改用 CAS + synchronized 控制並發操作,在某些方面提升了性能。並且追隨 1.8 版本的 HashMap 底層實現,使用數組+鏈表+紅黑樹進行數據存儲。本篇主要介紹 1.8 版本的 ConcurrentHashMap 的具體實現,有關其之前版本的實現情況,這裏推薦幾篇文章:

談談ConcurrentHashMap1.7和1.8的不同實現
ConcurrentHashMap在jdk1.8中的改進
ConcurrentHashMap原理分析(1.7與1.8)

二、重要成員屬性的介紹

transient volatile Node<K,V>[] table;

和 HashMap 中的語義一樣,代表整個哈希表。

/**
* The next table to use; non-null only while resizing.
*/
private transient volatile Node<K,V>[] nextTable;

這是一個連接表,用於哈希表擴容,擴容完成後會被重置為 null。

private transient volatile long baseCount;

該屬性保存著整個哈希表中存儲的所有的結點的個數總和,有點類似於 HashMap 的 size 屬性。

private transient volatile int sizeCtl;

這是一個重要的屬性,無論是初始化哈希表,還是擴容 rehash 的過程,都是需要依賴這個關鍵屬性的。該屬性有以下幾種取值:

  • 0:默認值
  • -1:代表哈希表正在進行初始化
  • 大於0:相當於 HashMap 中的 threshold,表示閾值
  • 小於-1:代表有多個線程正在進行擴容

該屬性的使用還是有點復雜的,在我們分析擴容源碼的時候再給予更加詳盡的描述,此處了解其可取的幾個值都分別代表著什麽樣的含義即可。

構造函數的實現也和我們上篇介紹的 HashMap 的實現類似,此處不再贅述,貼出源碼供大家比較。

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

其他常用的方法我們將在文末進行簡單介紹,下面我們主要來分析下 ConcurrentHashMap 的一個核心方法 put,我們也會一並解決掉該方法中涉及到的擴容、輔助擴容,初始化哈希表等方法。

三、put 方法實現並發添加

對於 HashMap 來說,多線程並發添加元素會導致數據丟失等並發問題,那麽 ConcurrentHashMap 又是如何做到並發添加的呢?

public V put(K key, V value) {
    return putVal(key, value, false);
}

putVal 的方法比較多,我們分兩個部分進行分析。

//第一部分
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //對傳入的參數進行合法性判斷
    if (key == null || value == null) throw new NullPointerException();
    //計算鍵所對應的 hash 值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果哈希表還未初始化,那麽初始化它
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //根據鍵的 hash 值找到哈希數組相應的索引位置
        //如果為空,那麽以CAS無鎖式向該位置添加一個節點
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   
        }

這裏需要詳細說明的只有 initTable 方法,這是一個初始化哈希表的操作,它同時只允許一個線程進行初始化操作。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //如果表為空才進行初始化操作
    while ((tab = table) == null || tab.length == 0) {
        //sizeCtl 小於零說明已經有線程正在進行初始化操作
        //當前線程應該放棄 CPU 的使用
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //否則說明還未有線程對表進行初始化,那麽本線程就來做這個工作
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            //保險起見,再次判斷下表是否為空
            try {
                if ((tab = table) == null || tab.length == 0) {
                    //sc 大於零說明容量已經初始化了,否則使用默認容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //根據容量構建數組
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //計算閾值,等效於 n*0.75
                    sc = n - (n >>> 2);
                }
            } finally {
                //設置閾值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

關於 initTable 方法的每一步實現都已經給出註釋,該方法的核心思想就是,只允許一個線程對表進行初始化,如果不巧有其他線程進來了,那麽會讓其他線程交出 CPU 等待下次系統調度。這樣,保證了表同時只會被一個線程初始化。

接著,我們回到 putVal 方法,這樣的話,我們第一部分的 putVal 源碼就分析結束了,下面我們看後一部分的源碼:

//檢測到桶結點是 ForwardingNode 類型,協助擴容
else if ((fh = f.hash) == MOVED)
     tab = helpTransfer(tab, f);
//桶結點是普通的結點,鎖住該桶頭結點並試圖在該鏈表的尾部添加一個節點
else {
       V oldVal = null;
       synchronized (f) {
           if (tabAt(tab, i) == f) {
              //向普通的鏈表中添加元素,無需贅述
              if (fh >= 0) {
                 binCount = 1;
                 for (Node<K,V> e = f;; ++binCount) {
                     K ek;
                     if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
                         oldVal = e.val;
                         if (!onlyIfAbsent)
                            e.val = value;
                            break;
                      }
                      Node<K,V> pred = e;
                      if ((e = e.next) == null) {
                         pred.next = new Node<K,V>(hash, key,value, null);
                         break;
                      }
                 }
           }
           //向紅黑樹中添加元素,TreeBin 結點的hash值為TREEBIN(-2)
           else if (f instanceof TreeBin) {
               Node<K,V> p;
               binCount = 2;
                 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,                                      value)) != null) {
                   oldVal = p.val;
                   if (!onlyIfAbsent)
                      p.val = value;
                }
           }
       }
   }
  //binCount != 0 說明向鏈表或者紅黑樹中添加或修改一個節點成功
  //binCount  == 0 說明 put 操作將一個新節點添加成為某個桶的首節點
  if (binCount != 0) {
         //鏈表深度超過 8 轉換為紅黑樹
         if (binCount >= TREEIFY_THRESHOLD)
             treeifyBin(tab, i);
         //oldVal != null 說明此次操作是修改操作
         //直接返回舊值即可,無需做下面的擴容邊界檢查
         if (oldVal != null)
             return oldVal;
           break;
        }
    }
}
//CAS 式更新baseCount,並判斷是否需要擴容
addCount(1L, binCount);
//程序走到這一步說明此次 put 操作是一個添加操作,否則早就 return 返回了
return null;

這一部分的源碼大體上已如註釋所描述,至此整個 putVal 方法的大體邏輯實現相信你也已經清晰了,好好回味一下。下面我們對這部分中的某些方法的實現細節再做一些深入學習。

首先需要介紹一下,ForwardingNode 這個節點類型,

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //註意這裏
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
    //省略其 find 方法
}

這個節點內部保存了一 nextTable 引用,它指向一張 hash 表。在擴容操作中,我們需要對每個桶中的結點進行分離和轉移,如果某個桶結點中所有節點都已經遷移完成了(已經被轉移到新表 nextTable 中了),那麽會在原 table 表的該位置掛上一個 ForwardingNode 結點,說明此桶已經完成遷移。

ForwardingNode 繼承自 Node 結點,並且它唯一的構造函數將構建一個鍵,值,next 都為 null 的結點,反正它就是個標識,無需那些屬性。但是 hash 值卻為 MOVED。

所以,我們在 putVal 方法中遍歷整個 hash 表的桶結點,如果遇到 hash 值等於 MOVED,說明已經有線程正在擴容 rehash 操作,整體上還未完成,不過我們要插入的桶的位置已經完成了所有節點的遷移。

由於檢測到當前哈希表正在擴容,於是讓當前線程去協助擴容。

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        //返回一個 16 位長度的擴容校驗標識
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            //sizeCtl 如果處於擴容狀態的話
            //前 16 位是數據校驗標識,後 16 位是當前正在擴容的線程總數
            //這裏判斷校驗標識是否相等,如果校驗符不等或者擴容操作已經完成了,直接退出循環,不用協助它們擴容了
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            //否則調用 transfer 幫助它們進行擴容
            //sc + 1 標識增加了一個線程進行擴容
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

下面我們看這個稍顯復雜的 transfer 方法,我們依然分幾個部分來細說。

//第一部分
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //計算單個線程允許處理的最少table桶首節點個數,不能小於 16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; 
        //剛開始擴容,初始化 nextTab 
        if (nextTab == null) {
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            //transferIndex 指向最後一個桶,方便從後向前遍歷 
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //定義 ForwardingNode 用於標記遷移完成的桶
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

這部分代碼還是比較簡單的,主要完成的是對單個線程能處理的最少桶結點個數的計算和一些屬性的初始化操作。

//第二部分,並發擴容控制的核心
boolean advance = true;
boolean finishing = false;
//i 指向當前桶,bound 指向當前線程需要處理的桶結點的區間下限
for (int i = 0, bound = 0;;) {
       Node<K,V> f; int fh;
       //這個 while 循環的目的就是通過 --i 遍歷當前線程所分配到的桶結點
       //一個桶一個桶的處理
       while (advance) {
           int nextIndex, nextBound;
           if (--i >= bound || finishing)
               advance = false;
           //transferIndex <= 0 說明已經沒有需要遷移的桶了
           else if ((nextIndex = transferIndex) <= 0) {
               i = -1;
               advance = false;
           }
           //更新 transferIndex
           //為當前線程分配任務,處理的桶結點區間為(nextBound,nextIndex)
           else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
               bound = nextBound;
               i = nextIndex - 1;
               advance = false;
           }
       }
       //當前線程所有任務完成
       if (i < 0 || i >= n || i + n >= nextn) {
           int sc;
           if (finishing) {
               nextTable = null;
               table = nextTab;
               sizeCtl = (n << 1) - (n >>> 1);
               return;
           }
           if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
               if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                   return;
               finishing = advance = true;
               i = n; 
           }
       }
       //待遷移桶為空,那麽在此位置 CAS 添加 ForwardingNode 結點標識該桶已經被處理過了
       else if ((f = tabAt(tab, i)) == null)
           advance = casTabAt(tab, i, null, fwd);
       //如果掃描到 ForwardingNode,說明此桶已經被處理過了,跳過即可
       else if ((fh = f.hash) == MOVED)
           advance = true; 

每個新參加進來擴容的線程必然先進 while 循環的最後一個判斷條件中去領取自己需要遷移的桶的區間。然後 i 指向區間的最後一個位置,表示遷移操作從後往前的做。接下來的幾個判斷就是實際的遷移結點操作了。等我們大致介紹完成第三部分的源碼再回來對各個判斷條件下的遷移過程進行詳細的敘述。

//第三部分
else {
    //
    synchronized (f) {
        if (tabAt(tab, i) == f) {
            Node<K,V> ln, hn;
            //鏈表的遷移操作
            if (fh >= 0) {
                int runBit = fh & n;
                Node<K,V> lastRun = f;
                //整個 for 循環為了找到整個桶中最後連續的 fh & n 不變的結點
                for (Node<K,V> p = f.next; p != null; p = p.next) {
                    int b = p.hash & n;
                    if (b != runBit) {
                        runBit = b;
                        lastRun = p;
                    }
                }
                if (runBit == 0) {
                    ln = lastRun;
                    hn = null;
                }
                else {
                    hn = lastRun;
                    ln = null;
                }
                //如果fh&n不變的鏈表的runbit都是0,則nextTab[i]內元素ln前逆序,ln及其之後順序
                //否則,nextTab[i+n]內元素全部相對原table逆序
                //這是通過一個節點一個節點的往nextTab添加
                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                    int ph = p.hash; K pk = p.key; V pv = p.val;
                    if ((ph & n) == 0)
                        ln = new Node<K,V>(ph, pk, pv, ln);
                    else
                        hn = new Node<K,V>(ph, pk, pv, hn);
                }
                //把兩條鏈表整體遷移到nextTab中
                setTabAt(nextTab, i, ln);
                setTabAt(nextTab, i + n, hn);
                //將原桶標識位已經處理
                setTabAt(tab, i, fwd);
                advance = true;
            }
            //紅黑樹的復制算法,不再贅述
            else if (f instanceof TreeBin) {
                TreeBin<K,V> t = (TreeBin<K,V>)f;
                TreeNode<K,V> lo = null, loTail = null;
                TreeNode<K,V> hi = null, hiTail = null;
                int lc = 0, hc = 0;
                for (Node<K,V> e = t.first; e != null; e = e.next) {
                    int h = e.hash;
                    TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
                    if ((h & n) == 0) {
                        if ((p.prev = loTail) == null)
                            lo = p;
                        else
                            loTail.next = p;
                    loTail = p;
                    ++lc;
                    }
                    else {
                        if ((p.prev = hiTail) == null)
                            hi = p;
                        else
                            hiTail.next = p;
                    hiTail = p;
                    ++hc;
                    }
                }
                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;
                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;
                setTabAt(nextTab, i, ln);
                setTabAt(nextTab, i + n, hn);
                setTabAt(tab, i, fwd);
                advance = true;
           }

那麽至此,有關遷移的幾種情況已經介紹完成了,下面我們整體上把控一下整個擴容和遷移過程。

首先,每個線程進來會先領取自己的任務區間,然後開始 --i 來遍歷自己的任務區間,對每個桶進行處理。如果遇到桶的頭結點是空的,那麽使用 ForwardingNode 標識該桶已經被處理完成了。如果遇到已經處理完成的桶,直接跳過進行下一個桶的處理。如果是正常的桶,對桶首節點加鎖,正常的遷移即可,遷移結束後依然會將原表的該位置標識位已經處理。

當 i < 0,說明本線程處理速度夠快的,整張表的最後一部分已經被它處理完了,現在需要看看是否還有其他線程在自己的區間段還在遷移中。這是退出的邏輯判斷部分:

技術分享圖片

finnish 是一個標誌,如果為 true 則說明整張表的遷移操作已經全部完成了,我們只需要重置 table 的引用並將 nextTable 賦為空即可。否則,CAS 式的將 sizeCtl 減一,表示當前線程已經完成了任務,退出擴容操作。

如果退出成功,那麽需要進一步判斷是否還有其他線程仍然在執行任務。

if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
   return;

我們說過 resizeStamp(n) 返回的是對 n 的一個數據校驗標識,占 16 位。而 RESIZE_STAMP_SHIFT 的值為 16,那麽位運算後,整個表達式必然在右邊空出 16 個零。也正如我們所說的,sizeCtl 的高 16 位為數據校驗標識,低 16 為表示正在進行擴容的線程數量。

(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示當前只有一個線程正在工作,相對應的,如果 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,說明當前線程就是最後一個還在擴容的線程,那麽會將 finishing 標識為 true,並在下一次循環中退出擴容方法。

這一塊的難點在於對 sizeCtl 的各個值的理解,關於它的深入理解,這裏推薦一篇文章。

著重理解位操作

看到這裏,真的為 Doug Lea 精妙的設計而折服,針對於多線程訪問問題,不但沒有拒絕式得將他們阻塞在門外,反而邀請他們來幫忙一起工作。

好了,我們一路往回走,回到我們最初分析的 putVal 方法。接著前文的分析,當我們根據 hash 值,找到對應的桶結點,如果發現該結點為 ForwardingNode 結點,表明當前的哈希表正在擴容和 rehash,於是將本線程送進去幫忙擴容。否則如果是普通的桶結點,於是鎖住該桶,分鏈表和紅黑樹的插入一個節點,具體插入過程類似 HashMap,此處不再贅述。

當我們成功的添加完成一個結點,最後是需要判斷添加操作後是否會導致哈希表達到它的閾值,並針對不同情況決定是否需要進行擴容,還有 CAS 式更新哈希表實際存儲的鍵值對數量。這些操作都封裝在 addCount 這個方法中,當然 putVal 方法的最後必然會調用該方法進行處理。下面我們看看該方法的具體實現,該方法主要做兩個事情。一是更新 baseCount,二是判斷是否需要擴容。

//第一部分,更新 baseCount
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //如果更新失敗才會進入的 if 的主體代碼中
    //s = b + x  其中 x 等於 1
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        //高並發下 CAS 失敗會執行 fullAddCount 方法
        if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }

這一部分主要完成的是對 baseCount 的 CAS 更新。

//第二部分,判斷是否需要擴容
if (check >= 0) {
     Node<K,V>[] tab, nt; int n, sc;
     while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {
          int rs = resizeStamp(n);
          if (sc < 0) {
             if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)
               break;
               if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                  transfer(tab, nt);
               }
           else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
               s = sumCount();
            }
    }

這部分代碼也是比較簡單的,不再贅述。

至此,對於 put 方法的源碼分析已經完全結束了,很復雜但也很讓人欽佩。下面我們簡單看看 remove 方法的實現。

四、remove 方法實現並發刪除

在我們分析完 put 方法的源碼之後,相信 remove 方法對你而言就比較輕松了,無非就是先定位再刪除的復合。

限於篇幅,我們這裏簡單的描述下 remove 方法的並發刪除過程。

首先遍歷整張表的桶結點,如果表還未初始化或者無法根據參數的 hash 值定位到桶結點,那麽將返回 null。

如果定位到的桶結點類型是 ForwardingNode 結點,調用 helpTransfer 協助擴容。

否則就老老實實的給桶加鎖,刪除一個節點。

最後會調用 addCount 方法 CAS 更新 baseCount 的值。

五、其他的一些常用方法的基本介紹

最後我們在補充一些 ConcurrentHashMap 中的小而常用的方法的介紹。

1、size
size 方法的作用是為我們返回哈希表中實際存在的鍵值對的總數。

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

可能你會有所疑問,ConcurrentHashMap 中的 baseCount 屬性不就是記錄的所有鍵值對的總數嗎?直接返回它不就行了嗎?

之所以沒有這麽做,是因為我們的 addCount 方法用於 CAS 更新 baseCount,但很有可能在高並發的情況下,更新失敗,那麽這些節點雖然已經被添加到哈希表中了,但是數量卻沒有被統計。

還好,addCount 方法在更新 baseCount 失敗的時候,會調用 fullAddCount 將這些失敗的結點包裝成一個 CounterCell 對象,保存在 CounterCell 數組中。那麽整張表實際的 size 其實是 baseCount 加上 CounterCell 數組中元素的個數。

2、get
get 方法可以根據指定的鍵,返回對應的鍵值對,由於是讀操作,所以不涉及到並發問題。源碼也是比較簡單的。

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

3、clear
clear 方法將刪除整張哈希表中所有的鍵值對,刪除操作也是一個桶一個桶的進行刪除。

public void clear() {
    long delta = 0L; // negative number of deletions
    int i = 0;
    Node<K,V>[] tab = table;
    while (tab != null && i < tab.length) {
        int fh;
        Node<K,V> f = tabAt(tab, i);
        if (f == null)
            ++i;
        else if ((fh = f.hash) == MOVED) {
            tab = helpTransfer(tab, f);
            i = 0; // restart
        }
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);
                        //循環到鏈表或者紅黑樹的尾部
                        while (p != null) {
                            --delta;
                            p = p.next;
                        }
                        //首先刪除鏈、樹的末尾元素,避免產生大量垃圾  
                        //利用CAS無鎖置null  
                        setTabAt(tab, i++, null);
                    }
                }
            }
        }
        if (delta != 0L)
            addCount(delta, -1);
    }

到此為止,有關這個為並發而生的 ConcurrentHashMap 內部的核心的部分,我們已經通過源碼進行了分析。確實挺費腦,再次膜拜下 jdk 大神們的智慧。總結不到之處,望指出!

為並發而生的 ConcurrentHashMap(Java 8)