1. 程式人生 > >ConcurrentHashMap 源碼淺析 1.7

ConcurrentHashMap 源碼淺析 1.7

高並發 threshold 1.5 鍵值 eat vol amp 根據 鏈式

  • 簡介

    (1) 背景
    HashMap死循環:HashMap在並發執行put操作時會引起死循環,是因為多線程會導致HashMap的Entry鏈表形成環形數據結構,一旦形成環形數據結構,Entry的next節點永遠不為空,就會產生死循環獲取Entry.
    HashTable效率低下:HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下.因為當一個線程訪問HashTable的同步方法,其它線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態.如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法獲取元素,所以競爭越激烈效率越低.

    (2) 簡介
    HashTable容器在競爭激烈的並發環境下表現出效率低下的原因是所有訪問HashTable的線程都必須競爭一把鎖,假如容器裏有多把鎖,每一把鎖用於鎖容器其中一部分數據,那麽多線程訪問容器裏不同的數據段時,線程間不會存在競爭,從而可以有效提高並發訪問效率,這就是ConcurrentHash所使用的鎖分段技術.首先將數據分成一段一段地儲存,然後給每一段配一把鎖,當一個線程占用鎖訪問其中一段數據時,其它段的數據也能被其它線程訪問.

  • 結構

    ConcurrentHash是由Segments數組結構和HashEntry數組結構組成.Segment是一種可重入鎖(ReentrantLock),在ConcurrentHashMap裏扮演鎖的色;HashEntry則用於存儲鍵值對數據.一個ConcurrentHashMap裏包含一個Segment組.Segment的結構和HashMap類似,是一種數組加鏈表的結構.一個Segment裏包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,每個Segment守護者一個HashEntry數組裏面的元素,當對HashEntry數組的數據進行修改時,必須先獲得與它對應的Segment鎖,如下圖所示.

    技術分享圖片

  • 基本成員
    default_initial_capacitymap默認容量,必須是2的冥

    /**
     * 默認的初始容量 16
     */
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    default_load_factor默認負載因子(存儲的比例)

    /**
     * 默認的負載因子
     */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    default_concurrency_level默認並發數量,segments數組量

    /**
     * 默認的並發數量,會影響segments數組的長度
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    maximum_capacitymap最大容量

    /**
     * 最大容量,構造ConcurrentHashMap時指定的值超過,就用該值替換
     * ConcurrentHashMap大小必須是2^n,且小於等於2^30
     */
    static final int MAXIMUM_CAPACITY = 1 << 30;

    min_segment_table_capacityHashEntry[]默認容量

    /**
     * 每個segment中table數組的長度,必須是2^n,至少為2
     */
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

    max_segments最大並發數,segments數組最大量

    /**
     * 允許最大segment數量,用於限定concurrencyLevel的邊界,必須是2^n
     */
    static final int MAX_SEGMENTS = 1 << 16;

    retries_before_lock重試次數,在加鎖之前

    /**
     * 非鎖定情況下調用size和contains方法的重試次數,避免由於table連續被修改導致無限重試
     */
    static final int RETRIES_BEFORE_LOCK = 2;

    segmentMask計算segment位置的掩碼(segments.length-1)

    /**
     * 用於segment的掩碼值,用於與hash的高位進行取&
     */
    final int segmentMask;

    segmentShift

    /**
     * 用於算segment位置時,hash參與運算的位數
     */
    final int segmentShift;

    segmentssegment數組

    /**
     * segments數組
     */
    final Segment<K,V>[] segments;

    HashEntry存儲數據的鏈式結構

    static final class HashEntry<K,V> {
        // hash值
        final int hash;
        // key
        final K key;
        // 保證內存可見性,每次從內存中獲取
        volatile V value;
        volatile HashEntry<K,V> next;
    
        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
    
        /**
         * 使用volatile語義寫入next,保證可見性
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

    Segment繼承ReentrantLock鎖,用於存放HashEntry[]

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
    
        /**
         * 對segment加鎖時,在阻塞之前自旋的次數
         *
         */
        static final int MAX_SCAN_RETRIES =
                Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    
        /**
         * 每個segment的HashEntry table數組,訪問數組元素可以通過entryAt/setEntryAt提供的volatile語義來完成
         * volatile保證可見性
         */
        transient volatile HashEntry<K,V>[] table;
    
        /**
         * 元素的數量,只能在鎖中或者其他保證volatile可見性之間進行訪問
         */
        transient int count;
    
        /**
         * 當前segment中可變操作發生的次數,put,remove等,可能會溢出32位
         * 它為chm isEmpty() 和size()方法中的穩定性檢查提供了足夠的準確性.
         * 只能在鎖中或其他volatile讀保證可見性之間進行訪問
         */
        transient int modCount;
    
        /**
         * 當table大小超過閾值時,對table進行擴容,值為(int)(capacity *loadFactor)
         */
        transient int threshold;
    
        /**
         * 負載因子
         */
        final float loadFactor;
    
        /**
         * 構造方法
         */
        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
  • 構造方法
    有參構造
    /**
     * ConcurrentHashMap 構造方法
     * @param initialCapacity 初始化容量
     * @param loadFactor 負載因子
     * @param concurrencyLevel 並發segment,segments數組的長度
     */
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // 大於最大segments容量,取最大容量
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        // 2^sshift = ssize 例如:sshift = 4,ssize = 16
        // 根據concurrencyLevel計算出ssize為segments數組的長度
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) { // 第一次 滿足
            ++sshift;  // 第一次 1
            ssize <<= 1; // 第一次 ssize = ssize << 1 (1 * 2^1)
        }
        // segmentShift和segmentMask的定義
        this.segmentShift = 32 - sshift; // 用於計算hash參與運算位數
        this.segmentMask = ssize - 1; // segments位置範圍
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        // 計算每個segment中table的容量
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        // HashEntry[]默認 容量
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        // 確保cap是2^n
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        // 創建segments並初始化第一個segment數組,其余的segment延遲初始化
        Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                        (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

    無參構造使用默認參數
    public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

  • 基本方法

    一些UNSAFE方法
    HashEntry
    setNext

    /**
         * 使用volatile語義寫入next,保證可見性
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

    entryAt get HashEntry

    /**
     * 獲取給定table的第i個元素,使用volatile讀語義
     */
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab == null) ? null :
                (HashEntry<K,V>) UNSAFE.getObjectVolatile
                        (tab, ((long)i << TSHIFT) + TBASE);
    }

    setEntryAt set HashEntry

    /**
     * 設置給定的table的第i個元素,使用volatile寫語義
     */
    static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e) {
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
    }

    put 插入元素

  • 總結
  • ConcurrentHashMap 源碼淺析 1.7