CocurrentHashMap實現原理及原始碼解析
##1、CocurrentHashMap概念
CocurrentHashMap是jdk中的容器,是hashmap的一個提升,結構圖:
這裡對比在對比hashmap的結構:
可以看出CocurrentHashMap對比HashMap在HashEnty前面加了Segment段,因為HashMap不是執行緒安全的,並且在多執行緒同時寫入的情況下會導致死迴圈,所以先多執行緒的環境下,一般不實用HashMap,而使用CocurrentHashMap,CocurrentHashMap通過分段鎖的機制,實現了多執行緒寫入時的執行緒安全,也提高了多執行緒情況下的訪問效率。
##2、通過原始碼分析CocurrentHashMap的實現
首先看建構函式:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
HashMap 的構造方法中有兩個引數,而ConcurrentHashMap這裡多了一個引數,三個引數分別是initialCapacity初始容量,loadFactor載入因子,concurrencyLevel並行度。而函式裡面也有幾個重要的引數,sshift,ssize ,segmentShift ,segmentMask ,cap。構造方法中首先判斷引數的有效性,然後通過並行度計算segment段陣列的大小ssize及sshift,段陣列大小是2的冪次方,這裡2的sshift等於ssize,同HashMap中陣列的大小一樣,在我的另一篇關於Hashmap的部落格中有詳細的解釋。確定了segment段陣列的大小後再確定HashEntry陣列的大小cap,cap也是2的冪次方且capssize是不小於initialCapacity的2的冪次方的數,最後先建立一個段,然後將這個段放入段陣列中創建出整個ConcurrentHashMap。從這裡就可以看出,原來的HashMap是HashEntry的大小就是cap
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
可以看到這裡面有三個引數,lf,threshold,tab,分別是載入因子,域和HashEntry表,可以看出其實每個segment裡面相當於就是一個小型的HashMap結構的容器。接下來看put操作:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
這個是ConcurrentHashMap對應的put操作,裡面首先檢查value是否為空,然後對key執行一次hash方法,利用hash值的高几位對segment段的長度減1取模得到segment段的索引,然後獲取到對應的segment元素,最後執行segment的put方法。接著看segment的put方法。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
在這個方法中,首先嚐試獲取這個段的鎖,獲取到鎖後然後利用上面獲取到的hash值繼續與段裡面HashEntry長度減1相與得到HashEntry陣列的索引值,接著獲取到對應的HashEntry元素,如果這個HashEntry節點不為空,那麼就找是否有鍵值相同的節點,如果有那麼就替換,如果沒有或者這個HashEntry節點為空,那麼新建一個HashEntry節點,然後判斷HashEntry陣列不為空節點是否超過閾值,超過了就擴容這個段裡面的hash表,沒有的話就將這個節點查到對應HashEntry連結串列的頭部。最後再釋放鎖。
然後來看get操作:
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
在get操作中,首先實現了也是獲取hash值,然後找到對應的段,然後通過這個hash值再找到對應的HashEntry節點,在遍歷這個連結串列看能不能找到鍵值相同的節點,有的話就返回節點的值,沒有的話就返回空,注意get操作是沒有加鎖的,但是通過getObjectVolatile可以獲取到已經更新的節點的最新的值。
最後再來看size操作:
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
size操作返回容器中儲存的鍵值對的個數,在size操作中首先獲取到所有段的鎖,然後通過迴圈獲取到每個每個段中鍵值對數量的累積值,最後釋放鎖。