1. 程式人生 > >ConcurrentHashMap原始碼之put和get方法

ConcurrentHashMap原始碼之put和get方法

以下ConcurrentHashMap類是基於jdk1.7來分析。

JDK1.7中ConcurrentHashMap是通過segments陣列和HashEntry陣列+連結串列來進行實現的。利用鎖分段技術,支援任務數量執行緒的讀和一定數量執行緒的寫。

我們看下ConcurrentHashMap是怎麼進行put和get操作的。

1、ConcurrentHashMap的put方法不能插入null值(為什麼?自行百度),在put kv值時,首先取key的hash值,通過hash值判斷key所在的segment,然後使用unsafe類的本地方法獲取此segments陣列中hash值對應的segment是否為null(為什麼用unsafe類呢?因為需要獲取記憶體中最新的儲存值,

關於unsafe類直接操作記憶體,參考這裡。),如果為null,則初始化segment元素,然後呼叫segment的put方法。ConcurrentHashMap類的put方法原始碼如下

//ConcurrentHashMap的put方法
public V put(K key, V value) {
        Segment<K,V> s;
//value值為null,直接報異常
        if (value == null)
            throw new NullPointerException();
//兩次hash,獲得key的雜湊值
        int hash = hash(key);
//對hash值的高位和segmentMask掩碼做按位與,確定key所在的segment(segmentMask=segment的長度-1)
        int j = (hash >>> segmentShift) & segmentMask;
//通過Unsafe類獲取segments陣列中下標為j的元素,如果不存在就初始化segment。(SSHIFT和SBASE均為確定陣列元素的記憶體位置,見以下變數宣告和static塊初始化)
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

// Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long SBASE;
    private static final int SSHIFT;
    private static final long TBASE;
    private static final int TSHIFT;
    private static final long HASHSEED_OFFSET;
    private static final long SEGSHIFT_OFFSET;
    private static final long SEGMASK_OFFSET;
    private static final long SEGMENTS_OFFSET;
    static {
        int ss, ts;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class tc = HashEntry[].class;
            Class sc = Segment[].class;
            TBASE = UNSAFE.arrayBaseOffset(tc);
            SBASE = UNSAFE.arrayBaseOffset(sc);
            ts = UNSAFE.arrayIndexScale(tc);
            ss = UNSAFE.arrayIndexScale(sc);
            HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("hashSeed"));
            SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentShift"));
            SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentMask"));
            SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segments"));
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
            throw new Error("data type scale not a power of two");
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
        TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
    }

ConcurrentHashMap的ensureSegment方法,獲取下標為 j 的segment元素方法,如果不存在則初始化。

private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
//獲取k的位置偏移量
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
//驗證ss陣列中u處的元素是否為null
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//獲取下標為0處的segment,目的是以第一個segment為模板,建立segment元素
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//再次驗證sh陣列中偏移量為u的元素是否為null
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck

            //建立segment元素
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
//利用unsafe類,迴圈原子的去設定u處的segment的值為新segment元素,設定成功則返回,如果此時有其他執行緒已經建立了u處的segment元素,則也返回。
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

通過ConcurrentHashMap的put方法發現,定位到segment後,主要呼叫segment的put方法來操作,以下是segment的put(K key, int hash, V value, boolean onlyIfAbsent)方法原始碼。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            //走到這裡說明,已經獲取到了segment上的鎖。以下的操作均是在加鎖的情況下進行。
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                //獲取index處最新的頭結點
                HashEntry<K,V> first = entryAt(tab, index);
                //獲取到index處的HashEntry頭節點後,迴圈去查詢Key
                for (HashEntry<K,V> e = first;;) {
                    //查詢Key是否已經存在,如果存在則用新值替換舊值。
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    //走到這裡,說明連結串列中不存在此Key。
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            //擴容,重新hash
                            rehash(node);
                        else
                            //將新結點插入到Index處,內部使用unsafe類操作。
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

從以上原始碼可以看出,segment的put方法首先去獲取此segment上的鎖。然後獲取index處最新的頭結點,為什麼不直接tab[index]取出頭結點呢,因為可能有其他執行緒修改過此index處的entry連結串列。對index處的連結串列迴圈,包含兩部分,第一個if中說明如果key已經在此連結串列中存在了,則用新value替換oldValue,返回oldValue;進入第二個if說明在連結串列中沒有找到對應的key值,則將新節點插入到頭結點之前,新節點當做頭結點,使用unsafe類更新table中的entry值。最後finally中,當key,value值設定完畢,直接解鎖。

為什麼第一個if中直接用e.value = value操作來更新,而第二個if中不用tab[index]=node來更新呢? 這需要看HashEntry節點的組成。jdk1.7中HashEntry節點原始碼如下,value是volatile變數,當value改變時,其他執行緒可以獲取到最新value,而HashEntry不是volatile修飾的,直接賦值其他執行緒不能看到最新的value值,所以需要藉助unsafe類來操作。

//HashEntry類
static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

從put操作的分析可以知道,ConcurrentHashMap的讀操作為什麼不需要加鎖了。在put操作時,無論是替換key的value值還是新增key value值,都能保證將最新值更新到主記憶體中。

在segment的put方法中,還有兩個方法,一個是獲取鎖時的tryLock方法和scanAndLockForPut方法,我們繼續看下這兩個方法。tryLock直接呼叫ReentrantLock的tryLock方法,此方法成功更新AQS中的同步狀態則表明獲取到鎖,返回true,否則返回false。具體可以參考這裡。

如果為false,則呼叫segment的scanAndLockForPut方法,此方法原始碼如下。

//segment的scanAndLockForPut方法
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//根據segment和hash值獲取對應key的HashEntry(頭結點)。
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
//第一個if確定key是否在此entry連結串列中存在,不存在則建立新節點。存在時,開始retry。
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
//第二個if判斷retry是否停止,阻塞等待獲取鎖。
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
//retry次數為偶數時,判斷頭結點是否有變化,有變化則變數歸位,重新迴圈。
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

scanAndLockForPut方法主要功能是獲取segment的鎖,返回值:key在segment中不存在則返回新節點,存在則返回null。此方法並沒有直接呼叫lock阻塞等待,而是在等待鎖可用的同時建立對應的節點。此方法會在兩種情況下返回:一是tryLock獲取到鎖,二是retry達到一定次數後,lock阻塞等待獲取到鎖。while迴圈中有3個if,第一個if(retries < 0) 是為了確定key是否在segment中存在,如果不存在則建立新的節點,存在時,開始retry,並沒有返回找到的節點(因為在外層會再次定位key,此時返回的可能是過期值)。第一個if確定後,第二個if是重試次數++,超過一定次數後,直接阻塞等待;第三個if是如果重試次數是偶數,則重新去記憶體中取最新的頭結點,如果有變化(有其他執行緒插入了節點),則變數歸位,重新迴圈。

 其中,entryForHash方法是根據segment和hash值獲取對應key的HashEntry。

2、以上分析了ConcurrentHashMap的put原始碼,現在再來看get原始碼。找到key對應的值,返回value,找不到返回null。get操作沒有加鎖,利用unsafe類和volatile修飾符來獲取最新的值(從主記憶體獲取那一刻是最新的,保證不是髒資料)。

get操作第一步取key的hash值,然後定位所在segment,並用unsafe類取最新的segment,最後定位到Key所在的HashEntry頭結點,迴圈連結串列獲取key對應的value值。

public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

參考:

Java中Unsafe類詳解

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析