1. 程式人生 > >CocurrentHashMap實現原理及原始碼解析

CocurrentHashMap實現原理及原始碼解析

##1、CocurrentHashMap概念
     CocurrentHashMap是jdk中的容器,是hashmap的一個提升,結構圖:
這裡寫圖片描述
這裡對比在對比hashmap的結構:
這裡寫圖片描述
可以看出CocurrentHashMap對比HashMap在HashEnty前面加了Segment段,因為HashMap不是執行緒安全的,並且在多執行緒同時寫入的情況下會導致死迴圈,所以先多執行緒的環境下,一般不實用HashMap,而使用CocurrentHashMap,CocurrentHashMap通過分段鎖的機制,實現了多執行緒寫入時的執行緒安全,也提高了多執行緒情況下的訪問效率。


##2、通過原始碼分析CocurrentHashMap的實現
     首先看建構函式:

 public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

HashMap 的構造方法中有兩個引數,而ConcurrentHashMap這裡多了一個引數,三個引數分別是initialCapacity初始容量,loadFactor載入因子,concurrencyLevel並行度。而函式裡面也有幾個重要的引數,sshift,ssize ,segmentShift ,segmentMask ,cap。構造方法中首先判斷引數的有效性,然後通過並行度計算segment段陣列的大小ssize及sshift,段陣列大小是2的冪次方,這裡2的sshift等於ssize,同HashMap中陣列的大小一樣,在我的另一篇關於Hashmap的部落格中有詳細的解釋。確定了segment段陣列的大小後再確定HashEntry陣列的大小cap,cap也是2的冪次方且capssize是不小於initialCapacity的2的冪次方的數,最後先建立一個段,然後將這個段放入段陣列中創建出整個ConcurrentHashMap。從這裡就可以看出,原來的HashMap是HashEntry的大小就是cap

ssize,但沒有分段,ConcurrentHashMap經過分段後容量也是cap*ssize,只是多了ssize個段,然後每個段裡面HashEntry的大小是cap。然後看Segment段的構造方法:

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }

可以看到這裡面有三個引數,lf,threshold,tab,分別是載入因子,域和HashEntry表,可以看出其實每個segment裡面相當於就是一個小型的HashMap結構的容器。接下來看put操作:

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject         
             (segments, (j << SSHIFT) + SBASE)) == null) 
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

這個是ConcurrentHashMap對應的put操作,裡面首先檢查value是否為空,然後對key執行一次hash方法,利用hash值的高几位對segment段的長度減1取模得到segment段的索引,然後獲取到對應的segment元素,最後執行segment的put方法。接著看segment的put方法。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

在這個方法中,首先嚐試獲取這個段的鎖,獲取到鎖後然後利用上面獲取到的hash值繼續與段裡面HashEntry長度減1相與得到HashEntry陣列的索引值,接著獲取到對應的HashEntry元素,如果這個HashEntry節點不為空,那麼就找是否有鍵值相同的節點,如果有那麼就替換,如果沒有或者這個HashEntry節點為空,那麼新建一個HashEntry節點,然後判斷HashEntry陣列不為空節點是否超過閾值,超過了就擴容這個段裡面的hash表,沒有的話就將這個節點查到對應HashEntry連結串列的頭部。最後再釋放鎖。
然後來看get操作:

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

在get操作中,首先實現了也是獲取hash值,然後找到對應的段,然後通過這個hash值再找到對應的HashEntry節點,在遍歷這個連結串列看能不能找到鍵值相同的節點,有的話就返回節點的值,沒有的話就返回空,注意get操作是沒有加鎖的,但是通過getObjectVolatile可以獲取到已經更新的節點的最新的值。
最後再來看size操作:

    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

size操作返回容器中儲存的鍵值對的個數,在size操作中首先獲取到所有段的鎖,然後通過迴圈獲取到每個每個段中鍵值對數量的累積值,最後釋放鎖。