1. 程式人生 > >Java集合框架閱讀筆記(三)ConcurrentHashMap

Java集合框架閱讀筆記(三)ConcurrentHashMap

類繼承 only d+ nan next related ati null lur

預備知識

  • AQS(AbstractQueuedSynchronizer):提供了一個框架用來構造同步一些工具類比如ReentrantLock、 CopyOnWriteArrayList、 CountDownLatch、Semaphore等。通過維護一個volatile修飾的變量state表示鎖的獲取情況和一個FIFO隊列用來存儲線程。

API:

Provides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues. This class is designed to
be a useful basis for most kinds of synchronizers that rely on a
single atomic {@code int} value to represent state.

ConcurrentHashMap

介紹

  • ConcurrentHashMap是一個線程安全的HashMap,繼承體系如下
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable

JDK8和JDK7的區別

存儲結構

  • JDK8存儲結構采用Node類型數組加鏈表並通過拉鏈法解決hash沖突而JDK7基於AQS實現了一個Segment類繼承自ReentrantLock,每個Segment對象持有一個HashEntry數組。Node和HashEntry都是實現了Map.Entry接口,只不過名字不同。

並發更新操作的實現

  • JDK8通過synchronized+CAS機制進行並發更新,鎖對象是數組下標對應的元素持有的Monitor。JDK7繼承了AQS裏的ReentrantLock進行加鎖實現的並發更新。

計算size

  • JDK8通過維護一個由volatile修飾的baseCount變量進行計數,以及一個CounterCell類進行記錄變化的次數來確定size。JDK7采用延遲計算,在計算過程中會對每個Segment計算至少兩次,如果出現數據不一致現象就進行全部加鎖最後求得size。

部分源碼分析

public V put(K key, V value)

直接調用putVal(K key, V value, boolean onlyIfAbsent)

public V put(K key, V value) {
    return putVal(key, value, false);
}
  • JDK8
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 預處理
    if (key == null || value == null) throw new NullPointerException();
    // 為了分布均勻對key的hashcode再hash
    int hash = spread(key.hashCode());
    // 根據binCount通過計算鏈表長度,判斷鏈表是否修改過,如果鏈表的數量超過設定的閾值,插入完成之後需要變成紅黑樹
    int binCount = 0;
    // 初始化tab,死循環直到插入成功
    for (Node<K,V>[] tab = table;;) {
        // 指向目標節點
        Node<K,V> f; 
        int n, i, fh;
        // 延遲加載:如果沒有初始化進行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果數組下標對應的元素為null,通過CAS放到當前數組下標處
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果當前節點的hash值為MOVED常量,則表示這個ConcurrentHashMap正在進行擴容,則當前線程幫助擴容helpTransfer(tab,f)。
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 當前數組下標有元素,並且可以正常插入,則根據節點元素類型是紅黑樹節點還是鏈表節點然後執行插入
        else {
            V oldVal = null;
            // 加鎖,Monitor對象是當前數組下標元素持有的Monitor
            synchronized (f) {
                // tabAt(tab,i)會通過Unsafe類求出數組下標元素,再次判斷確保是否是上面所確定的f
                if (tabAt(tab, i) == f) {
                    // 代表是鏈表元素,執行插入鏈表操作
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果已經存在此key,則更新即可
                            if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 不存在,則插入到鏈表尾部
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    // 當前數組下標元素類型是紅黑樹,執行紅黑樹插入操作
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 根據binCount判斷鏈表元素個數
            if (binCount != 0) {
                // 超過閾值,則將鏈表變成紅黑樹
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 將當前維護的baseCount + 1
    addCount(1L, binCount);
    return null;
}
  • JDK7
public V put(K key, V value) {
    Segment<K, V> s;
    if (value == null) throw new NullPointerException();
    int hash = hash(key);
    // UNSAFE通過hash找到Segment
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K, V>) UNSAFE.getObject          // nonvolatile; recheck
            (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 加鎖,如果加鎖成功則執行後續操作,沒有競爭到鎖就需要自旋直到獲取到鎖並執行插入操作。
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 找到引用的數組
        HashEntry<K,V>[] tab = table;
        // 求模計算數組下標
        int index = (tab.length - 1) & hash;
        // 找到數組第一個元素
        HashEntry<K,V> first = entryAt(tab, index);
        // 死循環,直到插入成功
        for (HashEntry<K,V> e = first;;) {
            // 數組對應下標已經存在數值。
            if (e != null) {
                K k;
                // 已經存在此key,進行更新
                if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        // 記錄每一個Segment中的HashEntry數組中元素的數量
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            // 如果當前數組下標已經存在元素
            } else {
                // 當前線程可能自旋過(自旋過程可能會返回一個已經初始化的node),所以需要判斷node是否為null
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 判斷是否需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 釋放鎖
        unlock();
    }
    return oldValue;
}

public V get(Object key)

  • JDK8
public V get(Object key) {
    Node<K,V>[] tab; 
    Node<K,V> e, p; 
    int n, eh; 
    K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 在紅黑樹中查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 在鏈表中查找
        while ((e = e.next) != null) {
            if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
  • JDK7
public V get(Object key) {
    Segment<K, V> s; // manually integrate access methods to reduce overhead
    HashEntry<K, V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 通過UNSAFE獲取Segment,通過Segment獲取HashEntry數組引用,進而找到指定的key
    if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
        for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
                (tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

public int size()

  • JDK8

通過由volatile修飾的baseCount變量以及CounterCell對象記錄變化的次數求出size()

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    // 獲取baseCount
    long sum = baseCount;
    if (as != null) {
        // 循環遍歷每一個CountCell中的value
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
  • JDK7

通過計算Segment數組中每一個Segment中的modcount(在put時會++modCount)求出sum。但是不止求一次sum,會至少求兩次(由RETRIES_BEFORE_LOCK決定的),如果兩次求出的sum不一致,下次求就會將Segment數組全部加鎖,重新求一次。

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K, V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn‘t retry
    try {
        for (; ; ) {
            // 超出兩次,將Segment全部加鎖
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K, V> seg = segmentAt(segments, j);
                if (seg != null) {
                    // 獲取Segment對象的modCount
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            // 和上一次求的結果比較
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

參考

  • https://www.jianshu.com/p/e694f1e868ec
  • http://penghb.com/2017/10/27/java/concurrentHashMap/
  • http://www.importnew.com/26049.html
  • https://swenfang.github.io/2018/06/03/Java%208%20ConcurrentHashMap%20%E6%BA%90%E7%A0%81%E8%A7%A3%E8%AF%BB/

Java集合框架閱讀筆記(三)ConcurrentHashMap