1. 程式人生 > >【Java8原始碼分析】併發包-ConcurrentHashMap(一)

【Java8原始碼分析】併發包-ConcurrentHashMap(一)

一、CAS原理簡介

Java8中,ConcurrentHashMap摒棄了Segment的概念,而是啟用了一種全新的方式實現:利用CAS演算法。它沿用了HashMap的思想,底層依然由“陣列”+連結串列+紅黑樹的方式實現。

那什麼CAS演算法呢?以前採用鎖的方式實現同步,對於簡單問題來說顯得粒度過大,影響效率。現在的處理器都支援多重處理,當然也包含多個處理器共享外圍裝置和記憶體,同時,加強了指令集以支援一些多處理的特殊需求。特別是幾乎所有的處理器都可以將其他處理器阻塞以便更新共享變數。

當前的處理器基本都支援CAS(Compare and swap),只不過每個廠家所實現的演算法並不一樣罷了,每一個CAS操作過程都包含三個運算子:一個記憶體地址V,一個期望的值A和一個新值B,操作的時候如果這個地址上存放的值等於這個期望的值A,則將地址上的值賦為新值B,否則不做任何操作。CAS的基本思路就是,如果這個地址上的值和期望的值相等,則給其賦予新值,否則不做任何事兒,但是要返回原值是多少。

簡單地說,CAS使得同步並不阻塞在程式語言層面上,而是阻塞在硬體層面上。

二、儲存結構

ConcurrentHashMap中有以下幾種常見的儲存結構:

  • Node:大部分key-value對都儲存在此結構中,用於hash衝突中,用連結串列法儲存
  • TreeNode:繼承Node,用於hash衝突中,用紅黑樹儲存
  • TreeBin:用來儲存TreeNode的根,並維護紅黑樹的新增、刪除等操作
  • ForwardingNode:在容器擴容時適用。被放置在bin的頭部
  • ReservationNode:在呼叫computeIfAbsent之類的函式時適用

(1)Node結構

    static
class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this
.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } // 不準對Node賦值 public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } // map.get的輔助方法 Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }

(2)TreeNode

    // TreeNode 只比普通的Node多了一些屬性,紅色樹的相關操作由TreeBin負責
    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent; 
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }

        Node<K,V> find(int h, Object k) {
            return findTreeNode(h, k, null);
        }

        final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
            if (k != null) {
                TreeNode<K,V> p = this;
                do  {
                    int ph, dir; K pk; TreeNode<K,V> q;
                    TreeNode<K,V> pl = p.left, pr = p.right;
                    if ((ph = p.hash) > h)
                        p = pl;
                    else if (ph < h)
                        p = pr;
                    else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                        return p;
                    else if (pl == null)
                        p = pr;
                    else if (pr == null)
                        p = pl;
                    else if ((kc != null ||
                              (kc = comparableClassFor(k)) != null) &&
                             (dir = compareComparables(kc, k, pk)) != 0)
                        p = (dir < 0) ? pl : pr;
                    else if ((q = pr.findTreeNode(h, k, kc)) != null)
                        return q;
                    else
                        p = pl;
                } while (p != null);
            }
            return null;
        }
    }

(3)TreeBin

TreeNode作為bin的頭節點,TreeBin實際上只是一系列的TreeNode和它們的根,並不包含key-value對。同時TreeBin還維護著這一系列TreeNode的讀寫鎖。

    static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock

        // 建構函式
        TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);
            this.first = b;
            TreeNode<K,V> r = null;
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r == null) {
                    x.parent = null;
                    x.red = false;
                    r = x;
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);
                            TreeNode<K,V> xp = p;
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x);
                            break;
                        }
                    }
                }
            }
            this.root = r;
            assert checkInvariants(root);
        }
        // 後面還有許多紅黑樹等相關操作
    }

三、屬性域

    /* 常量 */
    // 32位中的高2位被用來作為控制bit
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    private static final int DEFAULT_CAPACITY = 16;

    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    // 未被用到,用來相容之前版本
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    // 跟hashmap一樣
    private static final float LOAD_FACTOR = 0.75f;
    static final int TREEIFY_THRESHOLD = 8;
    static final int UNTREEIFY_THRESHOLD = 6;
    static final int MIN_TREEIFY_CAPACITY = 64;

    // 最小重排步
    private static final int MIN_TRANSFER_STRIDE = 16;

    // sizeCtl中記錄size的bit數
    private static int RESIZE_STAMP_BITS = 16;

    // 參與擴容的最大執行緒數
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    // sizeCtl中記錄size大小的偏移量
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    // 一些特定的雜湊值代表不同含義
    static final int MOVED     = -1; // hash for forwarding nodes
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

    // CPU數
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /* 屬性域 */
    // 桶表
    transient volatile Node<K,V>[] table;

    // 擴容時候使用
    private transient volatile Node<K,V>[] nextTable;

    // 沒有競爭條件時,使用
    private transient volatile long baseCount;

    // 用來初始化和擴容控制
    // == -1,代表初始化
    // < -1,-sizeCtl - 1 代表擴容
    // == 0,table為null
    // > 0,sizeCtl代表下次擴容容量
    private transient volatile int sizeCtl;

四、幾個重要方法

    // ConcurrentHashMap中的桶的訪問和賦值
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    // valatile方法,需在臨界區中執行
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

五、建構函式

    public ConcurrentHashMap() {
    }

    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    // 第三個引數為更新容器的執行緒數
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)
            initialCapacity = concurrencyLevel;
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

六、查詢操作

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());

        // 判斷table是否null或者未空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            // 注意:因為容器大小為2的次方,所以 h mod n = h & (n -1)
            (e = tabAt(tab, (n - 1) & h)) != null) {
            // 如果hash值相等
            if ((eh = e.hash) == h) {
                // 檢查第一個Node
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // hash值小於0,代表此bin非一般的連結串列Node
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;

            // 遍歷連結串列,對比key值
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

七、新增操作

    // key值和value值均不能為null
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // key值和value值均不能為null
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 如果table為空,執行初始化,也即是延遲初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 如果bin為空,則採用cas演算法賦值,無需加鎖
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break; 
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // bin非空,且bin為一般的Node連結串列時
            else {
                V oldVal = null;
                // 獲取第一個Node的鎖
                synchronized (f) {
                    // 再判斷以此f是否仍是第一個Node,如果不是,退出臨界區,重複新增操作
                    if (tabAt(tab, i) == f) {
                        // 根據hash值判斷,為一般的Node連結串列
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 如果key已經存在,替換,break退出
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 如果遍歷bin未找到key值相等等的,在bin末尾新增此Node
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果此bin是TreeBin
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 如果bin的容量大於臨界值,轉為紅黑樹
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        // 此函式內部會判斷是樹化,還是擴容:tryPresize
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 此函式也有可能導致擴容
        addCount(1L, binCount);
        return null;
    }

八、刪除操作

    public V remove(Object key) {
        return replaceNode(key, null, null);
    }

    final V replaceNode(Object key, V value, Object cv) {
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 不存在key值
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                // 替換操作,鎖定第一個Node
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // bin由Node連結串列組成
                        if (fh >= 0) {
                            validated = true;
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                // 如果找到了
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        if (value != null)
                                            e.val = value;
                                        else if (pred != null)
                                            pred.next = e.next;
                                        else
                                            // 注意:呼叫此方法必須在臨界區中
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                        // 如果是TreeBin的紅黑樹結構
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                if (validated) {
                    if (oldVal != null) {
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }

九、其他

  • ConcurrentHashMap是延遲初始化的,只有在插入資料時,整個HashMap才被初始化為2的次方大小個桶(bin),每個bin包含雜湊值相同的一系列Node(一般含有0或1個Node)。每個bin的第一個Node作為這個bin的鎖,Hash值為零或者負的將被忽略
  • 每個bin的第一個Node插入用到CAS原理,這是在ConcurrentHashMap中最常發生的操作,其餘的插入、刪除、替換操作對bin中的第一個Node加鎖,進行操作
  • ConcurrentHashMap的size()函式一般比較少用,同時為了提高增刪查改的效率,容器並未在內部儲存一個size值,而且採用每次呼叫size()函式時累加各個bin中Node的個數計算得到,而且這一過程不加鎖,即得到的size值不一定是最新的

未完待續

參考