1. 程式人生 > >Java7/8中的 HashMap和ConcurrentHashMap全解析

Java7/8中的 HashMap和ConcurrentHashMap全解析

原文出處: JavaDoop

今天發一篇"水文",可能很多讀者都會表示不理解,不過我想把它作為併發序列文章中不可缺少的一塊來介紹。本來以為花不了多少時間的,不過最終還是投入了挺多時間來完成這篇文章的。

網上關於 HashMap 和 ConcurrentHashMap 的文章確實不少,不過缺斤少兩的文章比較多,所以才想自己也寫一篇,把細節說清楚說透,尤其像 Java8 中的 ConcurrentHashMap,大部分文章都說不清楚。終歸是希望能降低大家學習的成本,不希望大家到處找各種不是很靠譜的文章,看完一篇又一篇,可是還是模模糊糊。

閱讀建議:四節基本上可以進行獨立閱讀,建議初學者可按照 Java7 HashMap -> Java7 ConcurrentHashMap -> Java8 HashMap -> Java8 ConcurrentHashMap 順序進行閱讀,可適當降低閱讀門檻。

閱讀前提:本文分析的是原始碼,所以至少讀者要熟悉它們的介面使用,同時,對於併發,讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個基本的知識,文中不會對這些知識進行介紹。Java8 用到了紅黑樹,不過本文不會進行展開,感興趣的讀者請自行查詢相關資料。

雜湊表

介紹

雜湊表就是一種以 鍵-值(key-indexed) 儲存資料的結構,我們只要輸入待查詢的值即key,即可查詢到其對應的值。

雜湊的思路很簡單,如果所有的鍵都是整數,那麼就可以使用一個簡單的無序陣列來實現:將鍵作為索引,值即為其對應的值,這樣就可以快速訪問任意鍵的值。這是對於簡單的鍵的情況,我們將其擴充套件到可以處理更加複雜的型別的鍵。

鏈式雜湊表

鏈式雜湊表從根本上說是由一組連結串列構成。每個連結串列都可以看做是一個“桶”,我們將所有的元素通過雜湊的方式放到具體的不同的桶中。插入元素時,首先將其鍵傳入一個雜湊函式(該過程稱為雜湊鍵),函式通過雜湊的方式告知元素屬於哪個“桶”,然後在相應的連結串列頭插入元素。查詢或刪除元素時,用同們的方式先找到元素的“桶”,然後遍歷相應的連結串列,直到發現我們想要的元素。因為每個“桶”都是一個連結串列,所以鏈式雜湊表並不限制包含元素的個數。然而,如果表變得太大,它的效能將會降低。

應用場景

我們熟知的快取技術(比如redis、memcached)的核心其實就是在記憶體中維護一張巨大的雜湊表,還有大家熟知的HashMap、CurrentHashMap等的應用。

ConcurrentHashMap與HashMap等的區別

HashMap

我們知道HashMap是執行緒不安全的,在多執行緒環境下,使用Hashmap進行put操作會引起死迴圈,導致CPU利用率接近100%,所以在併發情況下不能使用HashMap。

HashTable

HashTable和HashMap的實現原理幾乎一樣,差別無非是

  • HashTable不允許key和value為null
  • HashTable是執行緒安全的

但是HashTable執行緒安全的策略實現代價卻太大了,簡單粗暴,get/put所有相關操作都是synchronized的,
這相當於給整個雜湊表加了一把大鎖。

多執行緒訪問時候,只要有一個執行緒訪問或操作該物件,那其他執行緒只能阻塞,相當於將所有的操作序列化,
在競爭激烈的併發場景中效能就會非常差。

ConcurrentHashMap

主要就是為了應對hashmap在併發環境下不安全而誕生的,ConcurrentHashMap的設計與實現非常精巧,
大量的利用了volatile,final,CAS等lock-free技術來減少鎖競爭對於效能的影響。

我們都知道Map一般都是陣列+連結串列結構(JDK1.8該為陣列+紅黑樹)。

ConcurrentHashMap避免了對全域性加鎖改成了區域性加鎖操作,這樣就極大地提高了併發環境下的操作速度,
由於ConcurrentHashMap在JDK1.7和1.8中的實現非常不同,接下來我們談談JDK在1.7和1.8中的區別。

Java7 HashMap

HashMap 是最簡單的,一來我們非常熟悉,二來就是它不支援併發操作,所以原始碼也非常簡單。

首先,我們用下面這張圖來介紹 HashMap 的結構。

image

這個僅僅是示意圖,因為沒有考慮到陣列要擴容的情況,具體的後面再說。

大方向上,HashMap裡面是一個陣列,然後陣列中每個元素是一個單向連結串列

上圖中,每個綠色的實體是巢狀類 Entry 的例項,Entry 包含四個屬性:key, value, hash 值和用於單向連結串列的 next。

  • capacity:當前陣列容量,始終保持2^n,可以擴容,擴容後陣列大小為當前的 2 倍。

  • loadFactor:負載因子,預設為 0.75。

  • threshold:擴容的閾值,等於 capacity * loadFactor

put 過程分析

還是比較簡單的,跟著程式碼走一遍吧。


public V put(K key, V value) {
    // 當插入第一個元素的時候,需要先初始化陣列大小
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    // 如果 key 為 null,感興趣的可以往裡看,最終會將這個 entry 放到 table[0] 中
    if (key == null)
        return putForNullKey(value);
    // 1. 求 key 的 hash 值
    int hash = hash(key);
    // 2. 找到對應的陣列下標
    int i = indexFor(hash, table.length);
    // 3. 遍歷一下對應下標處的連結串列,看是否有重複的 key 已經存在,
    //    如果有,直接覆蓋,put 方法返回舊值就結束了
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }

    modCount++;
    // 4. 不存在重複的 key,將此 entry 新增到連結串列中,細節後面說
    addEntry(hash, key, value, i);
    return null;
}

陣列初始化

在第一個元素插入 HashMap 的時候做一次陣列的初始化,就是先確定初始的陣列大小,並計算陣列擴容的閾值。

private void inflateTable(int toSize) {
    // 保證陣列大小一定是 2 的 n 次方。
    // 比如這樣初始化:new HashMap(20),那麼處理成初始陣列大小是 32
    int capacity = roundUpToPowerOf2(toSize);
    // 計算擴容閾值:capacity * loadFactor
    threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
    // 算是初始化陣列吧
    table = new Entry[capacity];
    initHashSeedAsNeeded(capacity); //ignore
}

這裡有一個將陣列大小保持為 2 的 n 次方的做法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相應的要求,只不過實現的程式碼稍微有些不同,後面再看到的時候就知道了。

計算具體陣列位置

這個簡單,我們自己也能 YY 一個:使用 key 的 hash 值對陣列長度進行取模就可以了。

static int indexFor(int hash, int length) {
    // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2";
    return hash & (length-1);
}

這個方法很簡單,簡單說就是取 hash 值的低 n 位。如在陣列長度為 32 的時候,其實取的就是 key 的 hash 值的低 5 位,作為它在陣列中的下標位置。

新增節點到連結串列中

找到陣列下標後,會先進行key判重,如果沒有重複,就準備將新值放入到連結串列的表頭。

void addEntry(int hash, K key, V value, int bucketIndex) {
    // 如果當前 HashMap 大小已經達到了閾值,並且新值要插入的陣列位置已經有元素了,那麼要擴容
    if ((size >= threshold) && (null != table[bucketIndex])) {
        // 擴容,後面會介紹一下
        resize(2 * table.length);
        // 擴容以後,重新計算 hash 值
        hash = (null != key) ? hash(key) : 0;
        // 重新計算擴容後的新的下標
        bucketIndex = indexFor(hash, table.length);
    }
    // 往下看
    createEntry(hash, key, value, bucketIndex);
}
// 這個很簡單,其實就是將新值放到連結串列的表頭,然後 size++
void createEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

這個方法的主要邏輯就是先判斷是否需要擴容,需要的話先擴容,然後再將這個新的資料插入到擴容後的陣列的相應位置處的連結串列的表頭。

陣列擴容

前面我們看到,在插入新值的時候,如果當前的 size 已經達到了閾值,並且要插入的陣列位置上已經有元素,那麼就會觸發擴容,擴容後,陣列大小為原來的 2 倍。

void resize(int newCapacity) {
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    if (oldCapacity == MAXIMUM_CAPACITY) {
        threshold = Integer.MAX_VALUE;
        return;
    }
    // 新的陣列
    Entry[] newTable = new Entry[newCapacity];
    // 將原來陣列中的值遷移到新的更大的陣列中
    transfer(newTable, initHashSeedAsNeeded(newCapacity));
    table = newTable;
    threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}

擴容就是用一個新的大陣列替換原來的小陣列,並將原來陣列中的值遷移到新的陣列中。

由於是雙倍擴容,遷移過程中,會將原來 table[i] 中的連結串列的所有節點,分拆到新的陣列的 newTable[i] 和 newTable[i + oldLength] 位置上。如原來陣列長度是 16,那麼擴容後,原來 table[0] 處的連結串列中的所有元素會被分配到新陣列中 newTable[0] 和 newTable[16] 這兩個位置。程式碼比較簡單,這裡就不展開了。

get 過程分析

相對於 put 過程,get 過程是非常簡單的。

根據 key 計算 hash 值。
找到相應的陣列下標:hash & (length - 1)。
遍歷該陣列位置處的連結串列,直到找到相等(==或equals)的 key。

public V get(Object key) {
    // 之前說過,key 為 null 的話,會被放到 table[0],所以只要遍歷下 table[0] 處的連結串列就可以了
    if (key == null)
        return getForNullKey();
    //
    Entry<K,V> entry = getEntry(key);

    return null == entry ? null : entry.getValue();
}

getEntry(key):

final Entry<K,V> getEntry(Object key) {
    if (size == 0) {
        return null;
    }

    int hash = (key == null) ? 0 : hash(key);
    // 確定陣列下標,然後從頭開始遍歷連結串列,直到找到為止
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}

Java7 ConcurrentHashMap

在JDK1.7中ConcurrentHashMap採用了陣列+Segment+分段鎖的方式實現。

  • Segment(分段鎖)

ConcurrentHashMap中的分段鎖稱為Segment,它即類似於HashMap的結構,即內部擁有一個Entry陣列,
陣列中的每個元素又是一個連結串列,同時又是一個ReentrantLock(Segment繼承了ReentrantLock)。

  • 內部結構

ConcurrentHashMap使用分段鎖技術,將資料分成一段一段的儲存,然後給每一段資料配一把鎖,
當一個執行緒佔用鎖訪問其中一個段資料的時候,其他段的資料也能被其他執行緒訪問,
能夠實現真正的併發訪問。

ConcurrentHashMap 和 HashMap 思路是差不多的,但是因為它支援併發操作,所以要複雜一些。

整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會將其描述為分段鎖。注意,行文中,我很多地方用了“”來代表一個 segment。

簡單理解就是,ConcurrentHashMap 是一個 Segment 陣列,Segment 通過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是執行緒安全的,也就實現了全域性的執行緒安全。

image

concurrencyLevel:並行級別、併發數、Segment 數,怎麼翻譯不重要,理解它。預設是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支援 16 個執行緒併發寫,只要它們的操作分別分佈在不同的 Segment 上。這個值可以在初始化的時候設定為其他值,但是一旦初始化以後,它是不可以擴容的。

再具體到每個 Segment 內部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證執行緒安全,所以處理起來要麻煩些。

從上面的結構我們可以瞭解到,ConcurrentHashMap定位一個元素的過程需要進行兩次Hash操作。

第一次Hash定位到Segment,第二次Hash定位到元素所在的連結串列的頭部

  • 該結構的優劣勢

    • 壞處:這一種結構的帶來的副作用是Hash的過程要比普通的HashMap要長

    • 好處:寫操作的時候可以只對元素所在的Segment進行加鎖即可,不會影響到其他的Segment,

這樣,在最理想的情況下,ConcurrentHashMap可以最高同時支援Segment數量大小的寫操作(剛好這些寫操作都非常平均地分佈在所有的Segment上)。

所以,通過這一種結構,ConcurrentHashMap的併發能力可以大大的提高。

初始化

initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。

loadFactor:負載因子,之前我們說了,Segment 陣列不可以擴容,所以這個負載因子是給每個 Segment 內部使用的。

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 我們這裡先不要那麼燒腦,用預設值,concurrencyLevel 為 16,sshift 為 4
    // 那麼計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    // initialCapacity 是設定整個 map 初始的大小,
    // 這裡根據 initialCapacity 計算 Segment 陣列中每個位置可以分到的大小
    // 如 initialCapacity 為 64,那麼每個 Segment 或稱之為"槽"可以分到 4 個
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 預設 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上,
    // 插入一個元素不至於擴容,插入第二個的時候才會擴容
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;

    // 建立 Segment 陣列,
    // 並建立陣列的第一個元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往陣列寫入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

初始化完成,我們得到了一個 Segment 陣列。

我們就當是用 new ConcurrentHashMap() 無參建構函式進行初始化的,那麼初始化完成後:

  • Segment 陣列長度為 16,不可以擴容
  • Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
  • 這裡初始化了 segment[0],其他位置還是 null,至於為什麼要初始化 segment[0],後面的程式碼會介紹
  • 當前 segmentShift 的值為 32 - 4 = 28,segmentMask 為 16 - 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到

put 過程分析

我們先看 put 的主流程,對於其中的一些關鍵細節操作,後面會進行詳細介紹。

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 計算 key 的 hash 值
    int hash = hash(key);
    // 2. 根據 hash 值找到 Segment 陣列中的位置 j
    //    hash 是 32 位,無符號右移 segmentShift(28) 位,剩下高 4 位,
    //    然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的高 4 位,也就是槽的陣列下標
    int j = (hash >>> segmentShift) & segmentMask;
    // 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,
    // ensureSegment(j) 對 segment[j] 進行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}

第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作了。

Segment 內部是由 陣列+連結串列 組成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,需要先獲取該 segment 的獨佔鎖
    //    先看主流程,後面還會具體介紹這部分內容
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 這個是 segment 內部的陣列
        HashEntry<K,V>[] tab = table;
        // 再利用 hash 值,求應該放置的陣列下標
        int index = (tab.length - 1) & hash;
        // first 是陣列該位置處的連結串列的表頭
        HashEntry<K,V> first = entryAt(tab, index);

        // 下面這串 for 迴圈雖然很長,不過也很好理解,想想該位置沒有任何元素和已經存在一個連結串列這兩種情況
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續順著連結串列走
                e = e.next;
            }
            else {
                // node 到底是不是 null,這個要看獲取鎖的過程,不過和這裡都沒有關係。
                // 如果不為 null,那就直接將它設定為連結串列表頭;如果是null,初始化並設定為連結串列表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);

                int c = count + 1;
                // 如果超過了該 segment 的閾值,這個 segment 需要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴容後面也會具體分析
                else
                    // 沒有達到閾值,將 node 放到陣列 tab 的 index 位置,
                    // 其實就是將新的節點設定成原連結串列的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}

整體流程還是比較簡單的,由於有獨佔鎖的保護,所以 segment 內部的操作並不複雜。至於這裡面的併發問題,我們稍後再進行介紹。

到這裡 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。

初始化槽: ensureSegment

ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。

這裡需要考慮併發,因為很可能會有多個執行緒同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 這裡看到為什麼之前要初始化 segment[0] 了,
        // 使用當前 segment[0] 處的陣列長度和負載因子來初始化 segment[k]
        // 為什麼要用“當前”,因為 segment[0] 可能早就擴容過了
        Segment<K,V> proto = ss[0];
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);

        // 初始化 segment[k] 內部的陣列
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // 再次檢查一遍該槽是否被其他執行緒初始化了。

            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 使用 while 迴圈,內部用 CAS,當前執行緒成功設值或其他執行緒成功設值後,退出
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    <