1. 程式人生 > >jdk1.8中ConcurrentHashMap的實現原理

jdk1.8中ConcurrentHashMap的實現原理

併發環境下為什麼使用ConcurrentHashMap

1. HashMap在高併發的環境下,執行put操作會導致HashMap的Entry連結串列形成環形資料結構,從而導致Entry的next節點始終不為空,因此產生死迴圈獲取Entry

2. HashTable雖然是執行緒安全的,但是效率低下,當一個執行緒訪問HashTable的同步方法時,其他執行緒如果也訪問HashTable的同步方法,那麼會進入阻塞或者輪訓狀態。

3. 在jdk1.6中ConcurrentHashMap使用鎖分段技術提高併發訪問效率。首先將資料分成一段一段地儲存,然後給每一段資料配一個鎖,當一個執行緒佔用鎖訪問其中一段資料時,其他段的資料也能被其他執行緒訪問。然而在jdk1.8中的實現已經拋棄了Segment分段鎖機制,利用CAS+Synchronized來保證併發更新的安全,底層依然採用陣列+連結串列+紅黑樹的儲存結構。

JDK1.6分析

ConcurrentHashMap採用 分段鎖的機制,實現併發的更新操作,底層由Segment陣列和HashEntry陣列組成。Segment繼承ReentrantLock用來充當鎖的角色,每個 Segment 物件守護每個雜湊對映表的若干個桶。HashEntry 用來封裝對映表的鍵 / 值對;每個桶是由若干個 HashEntry 物件連結起來的連結串列。一個 ConcurrentHashMap 例項中包含由若干個 Segment 物件組成的陣列,下面我們通過一個圖來演示一下 ConcurrentHashMap 的結構:

這裡寫圖片描述

JDK1.8分析

改進一:取消segments欄位,直接採用transient volatile HashEntry<K,V> table

儲存資料,採用table陣列元素作為鎖,從而實現了對每一行資料進行加鎖,進一步減少併發衝突的概率。

改進二:將原先table陣列+單向連結串列的資料結構,變更為table陣列+單向連結串列+紅黑樹的結構。對於hash表來說,最核心的能力在於將key hash之後能均勻的分佈在陣列中。如果hash之後雜湊的很均勻,那麼table陣列中的每個佇列長度主要為0或者1。但實際情況並非總是如此理想,雖然ConcurrentHashMap類預設的載入因子為0.75,但是在資料量過大或者運氣不佳的情況下,還是會存在一些佇列長度過長的情況,如果還是採用單向列表方式,那麼查詢某個節點的時間複雜度為O(n);因此,對於個數超過8(預設值)的列表,jdk1.8中採用了紅黑樹的結構,那麼查詢的時間複雜度可以降低到O(logN),可以改進效能。

ConcurrentHashMap的重要屬性

/**
 * races. Updated via CAS.
 * 記錄容器的容量大小,通過CAS更新
 */
 private static final long BASECOUNT;

/**
 * 這個sizeCtl是volatile的,那麼他是執行緒可見的,一個思考:它是所有修改都在CAS中進行,但是sizeCtl為什麼不設計成LongAdder(jdk8出現的)型別呢?
 * 或者設計成AtomicLong(在高併發的情況下比LongAdder低效),這樣就能減少自己操作CAS了。
 *
 * 預設為0,用來控制table的初始化和擴容操作,具體應用在後續會體現出來。
 * -1 代表table正在初始化
 * -N 表示有N-1個執行緒正在進行擴容操作
 * 其餘情況:
 *1、如果table未初始化,表示table需要初始化的大小。
 *2、如果table初始化完成,表示table的容量,預設是table大小的0.75 倍,居然用這個公式算0.75(n - (n >>> 2))。
 **/
private static final long SIZECTL;

/**
 *  自旋鎖 (鎖定通過 CAS) 在調整大小和/或建立 CounterCells 時使用。 在CounterCell類更新value中會使用,功能類似顯示鎖和內建鎖,效能更好
 *  在Striped64類也有應用
 */
 private static final long CELLSBUSY;

Node:儲存key,value及key的hash值的資料結構。其中value和next都用volatile修飾,保證併發的可見性。

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;//volatile型別的
        volatile Node<K,V> next;//volatile型別的


        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        //省略部分程式碼
       }

ForwardingNode:一個特殊的Node節點,hash值為-1,其中儲存nextTable的引用。

    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        //省略部分程式碼
        }

ConcurrentHashMap的建構函式

    //預設的建構函式
    public ConcurrentHashMap(){}

    /**
    *initialCapacity 初始化容量
    **/
    public ConcurrentHashMap(int initialCapacity) {}

    /**
    *
    *建立與給定map具有相同對映的新map
    **/
    public ConcurrentHashMap(Map<? extends K, ? extends V> m){}
   /**
    *initialCapacity 初始容量
    *loadFactor 負載因子,當容量達到initialCapacity*loadFactor時,執行擴容
    *concurrencyLevel 預估的併發更新執行緒數
    **/
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {}

    /**
    *initialCapacity 初始容量
    *loadFactor 負載因子
    *concurrencyLevel 預估的併發更新執行緒數
    **/
     public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {}

接下來具體看看第四個建構函式的具體實現:

 public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   //至少使用盡可能多的bin
            initialCapacity = concurrencyLevel;   //作為估計執行緒
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;//初始化sizeCtl
    }
    /**
    *返回給定所需容量,table的大小總是2的冪次方
    **/
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

ConcurrentHashMap在建構函式中只會初始化sizeCtl值,並不會直接初始化table,而是延緩到第一次put操作

put()方法的實現

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());//對hashCode進行再雜湊,演算法為(h ^ (h >>> 16)) & HASH_BITS
    int binCount = 0;
 //這邊加了一個迴圈,就是不斷的嘗試,因為在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
    //因為如果其他執行緒正在修改tab,那麼嘗試就會失敗,所以這邊要加一個for迴圈,不斷的嘗試
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果table為空,初始化;否則,根據hash值計算得到陣列索引i,如果tab[i]為空,直接新建節點Node即可。注:tab[i]實質為連結串列或者紅黑樹的首節點。
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();

        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果tab[i]不為空並且hash值為MOVED(-1),說明該連結串列正在進行transfer操作,返回擴容完成後的table。
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 針對首個節點進行加鎖操作,而不是segment,進一步減少執行緒衝突
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果在連結串列中找到值為key的節點e,直接設定e.val = value即可。
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 如果沒有找到值為key的節點,直接新建Node並加入連結串列即可。
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 如果首節點為TreeBin型別,說明為紅黑樹結構,執行putTreeVal操作。
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 如果節點數>=8,那麼轉換連結串列結構為紅黑樹結構。
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 計數增加1,有可能觸發transfer操作(擴容)。
    addCount(1L, binCount);
    return null;
}
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

/*
 *但是這邊為什麼i要等於((long)i << ASHIFT) + ABASE呢,計算偏移量
 *ASHIFT是指tab[i]中第i個元素在相對於陣列第一個元素的偏移量,而ABASE就算第一陣列的記憶體素的偏移地址
 *所以呢,((long)i << ASHIFT) + ABASE就算i最後的地址
 * 那麼compareAndSwapObject的作用就算tab[i]和c比較,如果相等就tab[i]=v否則tab[i]=c;
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

我們還是繼續一步步看程式碼,看inputVal的註釋a,這個方法helpTransfer,如果執行緒進入到這邊說明已經有其他執行緒正在做擴容操作,這個是一個輔助方法

/**
 * Helps transfer if a resize is in progress.
 */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            //下面幾種情況和addCount的方法一樣,請參考addCount的備註
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

當我們的putVal執行到addCount的時候

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;

    //U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 每次竟來都baseCount都加1因為x=1
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//1
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            //多執行緒CAS發生失敗的時候執行
            fullAddCount(x, uncontended);//2
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        //當條件滿足開始擴容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {//如果小於0說明已經有執行緒在進行擴容操作了
                //一下的情況說明已經有在擴容或者多執行緒進行了擴容,其他執行緒直接break不要進入擴容操作
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//如果相等說明擴容已經完成,可以繼續擴容
                    transfer(tab, nt);
            }
            //這個時候sizeCtl已經等於(rs << RESIZE_STAMP_SHIFT) + 2等於一個大的負數,這邊加上2很巧妙,因為transfer後面對sizeCtl--操作的時候,最多隻能減兩次就結束
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

看上面註釋1,每次都會對baseCount 加1,如果併發競爭太大,那麼可能導致U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x) 失敗,那麼為了提高高併發的時候baseCount可見性失敗的問題,又避免一直重試,這樣效能會有很大的影響,那麼在jdk8的時候是有引入一個類Striped64,其中LongAdder和DoubleAdder就是對這個類的實現。這兩個方法都是為解決高併發場景而生的,是AtomicLong的加強版,AtomicLong在高併發場景效能會比LongAdder差。但是LongAdder的空間複雜度會高點。

我們每次進來都對baseCount進行加1當達到一定的容量時,就需要對table進行擴容。擴容方法就是transfer,這個方法稍微複雜一點,大部分的程式碼我都做了註釋

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    //構建一個連節點的指標,用於標識位
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    //迴圈的關鍵變數,判斷是否已經擴容完成,完成就return,退出迴圈
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        //迴圈的關鍵i,i--操作保證了倒序遍歷陣列
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {//nextIndex=transferIndex=n=tab.length(預設16)
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        //i<0說明已經遍歷完舊的陣列tab;i>=n什麼時候有可能呢?在下面看到i=n,所以目前i最大應該是n吧。
        //i+n>=nextn,nextn=nextTab.length,所以如果滿足i+n>=nextn說明已經擴容完成
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {// a
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            //利用CAS方法更新這個擴容閾值,在這裡面sizectl值減一,說明新加入一個執行緒參與到擴容操作,參考sizeCtl的註釋
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //如果有多個執行緒進行擴容,那麼這個值在第二個執行緒以後就不會相等,因為sizeCtl已經被減1了,所以後面的執行緒就只能直接返回,始終保證只有一個執行緒執行了 a(上面註釋a)
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;//finishing和advance保證執行緒已經擴容完成了可以退出迴圈
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)//如果tab[i]為null,那麼就把fwd插入到tab[i],表明這個節點已經處理過了
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)//那麼如果f.hash=-1的話說明該節點為ForwardingNode,說明該節點已經處理過了
            advance = true; // already processed
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        //這邊還對連結串列進行遍歷,這邊的的演算法和hashmap的演算法又不一樣了,這班是有點對半拆分的感覺
                        //把連結串列分表拆分為,hash&n等於0和不等於0的,然後分別放在新表的i和i+n位置
                        //次方法同hashmap的resize
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        //把已經替換的節點的舊tab的i的位置用fwd替換,fwd包含nextTab
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }//下面紅黑樹基本和連結串列差不多
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        //判斷擴容後是否還需要紅黑樹結構
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

值得細細品味的是,transfer的for迴圈是倒敘的,說明對table的遍歷是從table.length-1開始到0的。我覺得這段程式碼寫得太牛逼了,特別是

//利用CAS方法更新這個擴容閾值,在這裡面sizectl值減一,說明新加入一個執行緒參與到擴容操作,參考sizeCtl的註釋
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
    //如果有多個執行緒進行擴容,那麼這個值在第二個執行緒以後就不會相等,因為sizeCtl已經被減1了,所以後面的執行緒就只能直接返回,始終保證只有一個執行緒執行了 a(上面註釋a)
    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
        return;
    finishing = advance = true;//finishing和advance保證執行緒已經擴容完成了可以退出迴圈
    i = n; // recheck before commit
}

注意:如果連結串列結構中元素超過TREEIFY_THRESHOLD閾值,預設為8個,則把連結串列轉化為紅黑樹,提高遍歷查詢效率.接下來我們看看如何構造樹結構,程式碼如下:

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

可以看出,生成樹節點的程式碼塊是同步的,進入同步程式碼塊之後,再次驗證table中index位置元素是否被修改過。
1、根據table中index位置Node連結串列,重新生成一個hd為頭結點的TreeNode連結串列。
2、根據hd頭結點,生成TreeBin樹結構,並把樹結構的root節點寫到table的index位置的記憶體中,具體實現如下:

TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);
    this.first = b;
    TreeNode<K,V> r = null;
    for (TreeNode<K,V> x = b, next; x != null; x = next) {
        next = (TreeNode<K,V>)x.next;
        x.left = x.right = null;
        if (r == null) {
            x.parent = null;
            x.red = false;
            r = x;
        }
        else {
            K k = x.key;
            int h = x.hash;
            Class<?> kc = null;
            for (TreeNode<K,V> p = r;;) {
                int dir, ph;
                K pk = p.key;
                if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0)
                    dir = tieBreakOrder(k, pk);
                    TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    x.parent = xp;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    r = balanceInsertion(r, x);
                    break;
                }
            }
        }
    }
    this.root = r;
    assert checkInvariants(root);
}

get()方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)//如果eh=-1就說明e節點為ForWordingNode,這說明什麼,說明這個節點已經不存在了,被另一個執行緒正則擴容
        //所以要查詢key對應的值的話,直接到新newtable找
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

這個get請求,我們需要cas來保證變數的原子性。如果tab[i]正被鎖住,那麼CAS就會失敗,失敗之後就會不斷的重試。這也保證了get在高併發情況下不會出錯。
我們來分析下到底有多少種情況會導致get在併發的情況下可能取不到值。1、一個執行緒在get的時候,另一個執行緒在對同一個key的node進行remove操作;2、一個執行緒在get的時候,另一個執行緒正則重排table。可能導致舊table取不到值。
那麼本質是,我在get的時候,有其他執行緒在對同一桶的連結串列或樹進行修改。那麼get是怎麼保證同步性的呢?我們看到e = tabAt(tab, (n - 1) & h)) != null,在看下tablAt到底是幹嘛的:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

它是對tab[i]進行原子性的讀取,因為我們知道putVal等對table的桶操作是有加鎖的,那麼一般情況下我們對桶的讀也是要加鎖的,但是我們這邊為什麼不需要加鎖呢?因為我們用了Unsafe的getObjectVolatile,因為table是volatile型別,所以對tab[i]的原子請求也是可見的。因為如果同步正確的情況下,根據happens-before原則,對volatile域的寫入操作happens-before於每一個後續對同一域的讀操作。所以不管其他執行緒對table連結串列或樹的修改,都對get讀取可見。

參考

相關推薦

jdk1.8ConcurrentHashMap實現原理

併發環境下為什麼使用ConcurrentHashMap 1. HashMap在高併發的環境下,執行put操作會導致HashMap的Entry連結串列形成環形資料結構,從而導致Entry的next節點始終不為空,因此產生死迴圈獲取Entry 2. HashTa

JDK1.8HashMap實現

替換 應該 初始化 第一個元素 擴容 實現 1.8 put 相同 JDK1.8中的HashMap實現跟JDK1.7中的實現有很大差別。下面分析JDK1.8中的實現,主要看put和get方法。 構造方法的時候並沒有初始化,而是在第一次put的時候初始化 put

1.jdk1.8hashMap的原理,hash衝突如何解決

一:hashMap的工作原理        HashMap是基於鏈地址法的原理,使用put(key, value)儲存物件到HashMap中,使用get(key)從HashMap中獲取物件。        當我們給put()方法傳遞鍵和值時,我們先對鍵呼叫hashCode

HashMap 在JDK1.8實現

摘要HashMap是Java程式設計師使用頻率最高的用於對映(鍵值對)處理的資料型別。隨著JDK(Java Developmet Kit)版本的更新,JDK1.8對HashMap底層的實現進行了優化,例如引入紅黑樹的資料結構和擴容的優化等。本文結合JDK1.7和JDK1.8的區別,深入探討HashMap的結構

ConcurrentHashMap JDK1.8結構原理及原始碼分析

注:本文根據網路和部分書籍整理基於JDK1.7書寫,如有雷同敬請諒解  歡迎指正文中的錯誤之處。 資料結構       ConcurrentHashMap 1.8 拋棄了Segment分段鎖機制,採用Node + CAS + Synchronized來保證併發安全進行實現

JDK1.8ArrayList的實現原理及原始碼分析

一、概述              ArrayList是Java開發中使用比較頻繁的一個類,通過對原始碼的解讀,可以瞭解ArrayList的內部結構以及實現方法,清楚它的優缺點,以便我們在程式設計時靈活運用。 二、原始碼分析 2.1 類結構  JDK1.8原始碼中的A

HashMap在jdk1.7和1.8實現

Java集合類的原始碼是深入學習Java非常好的素材,原始碼裡很多優雅的寫法和思路,會讓人歎為觀止。HashMap的原始碼尤為經典,是非常值得去深入研究的,jdk1.8中HashMap發生了比較大的變化,這方面的東西也是各個公司高頻的考點。網上也有很多應對面試的標準答案,我之前也寫過類似的面

【必備技能】HashMap在jdk1.7和1.8實現

static final int TREEIFY_THRESHOLD = 8; public V put(K key, V value) { return putVal(hash(key), key, value, false, true); } final V putVal(i

[技術分享]-ConcurrentHashMapjdk1.8的改進

一、簡單回顧ConcurrentHashMap在jdk1.7中的設計與Hashtable不同的是,ConcurrentHashMap使用的是分段鎖技術,將ConcurrentHashMap容器的資料分段儲存,每一段資料分配一個Segment,當執行緒佔用一個Segment時,

jdk1.6 1.7 1.8 LinkedList原始碼實現原理及區別

LinkedList(jdk1.6) private transient Entry<E> header = new Entry<E>(null, null, null); 定義一個空的Entry物件作為頭結點,Entry是其內部

Java併發程式設計總結4——ConcurrentHashMapjdk1.8的改進

一、簡單回顧ConcurrentHashMap在jdk1.7中的設計     先簡單看下ConcurrentHashMap類在jdk1.7中的設計,其基本結構如圖所示: 每一個segment都是一個HashEntry<K,V>[] table, table中的每一個元素本質上都是一個Has

JDK1.8 的hashmap和concurrentHashMap

hashmap 在JDK1.6中,HashMap採用Node陣列+連結串列實現,即使用連結串列處理衝突,同一hash值的連結串列都儲存在一個連結串列裡。但是當位於一個桶中的元素較多,即hash值相等的元素較多時,通過key值依次查詢的效率較低。而JDK1.8中

C#foreach實現原理

示例 元素 res 過程 false 編程語言 static posit this 本文主要記錄我在學習C#中foreach遍歷原理的心得體會。 對集合中的要素進行遍歷是所有編碼中經常涉及到的操作,因此大部分編程語言都把此過程寫進了語法中,比如C#中的foreach。經

ConcurrentHashMap實現原理

過時 initial 初始化 bin 重新 hashcode his 就是 cati ConcurrentHashMap采用了分段加鎖的方式看看get操作hashTable和ConcurrenHashMap的區別 public synchronized V get(Ob

Java8 ConcurrentHashMap工作原理的要點分析

tail dtree outer initial 而不是 ubd rule 設定 tree 簡介: 本文主要介紹Java8中的並發容器ConcurrentHashMap的工作原理,和其它文章不同的是,本文重點分析了不同線程的各類並發操作如get,put,remove之間是如

jdk1.8接口可以寫默認方法

wheel void JD PE 靜態 調用 默認 sta default interface Vehicle {  default void print(){     System.out.println("我是一輛車!");   }   stat

ConcurrentHashMap實現原理以及源碼分析

賦值 already 設計 [] 取數 ole vat 復制 變化 ConcurrentHashMap是HashMap的高並發版本,是線程安全的,而HashMap是非線程安全的 一、底層實現 底層結構跟hashmap一樣,都是通過數組+鏈表+紅黑樹實現的,不過它要保證線程

ConcurrentHashMap實現原理以及原始碼解析

ConcurrentHashMap實現原理以及原始碼解析 ConcurrentHashMap是Java1.5中引用的一個執行緒安全的支援高併發的HashMap集合類。 1、執行緒不安全的HashMap 因為多執行緒環境下,使用Hashmap進行put操作會引起死迴圈

Redux combineReducers實現原理

使用 product patch 分支結構 實現原理 復合 reducer 實現 判斷 使用一個reducer const initialState = { id : 2, name : ‘myName

JDK1.8TreeMap原始碼解析——紅黑樹刪除

在看本文之前建議先看一下二叉樹的刪除過程,這裡有一篇文章寫得不錯,可以看一下 1、後繼節點 在看原始碼之前,先說說紅黑樹尋找 待刪除節點t 的 後繼節點 的過程: 如果待刪除節點t有右節點,那麼後繼節點為該節點右子樹中最左的節點,也就是右子樹中值最小的節