ConcurrentHashMap原始碼之put和get方法
以下ConcurrentHashMap類是基於jdk1.7來分析。
JDK1.7中ConcurrentHashMap是通過segments陣列和HashEntry陣列+連結串列來進行實現的。利用鎖分段技術,支援任務數量執行緒的讀和一定數量執行緒的寫。
我們看下ConcurrentHashMap是怎麼進行put和get操作的。
1、ConcurrentHashMap的put方法不能插入null值(為什麼?自行百度),在put kv值時,首先取key的hash值,通過hash值判斷key所在的segment,然後使用unsafe類的本地方法獲取此segments陣列中hash值對應的segment是否為null(為什麼用unsafe類呢?因為需要獲取記憶體中最新的儲存值,
//ConcurrentHashMap的put方法 public V put(K key, V value) { Segment<K,V> s; //value值為null,直接報異常 if (value == null) throw new NullPointerException(); //兩次hash,獲得key的雜湊值 int hash = hash(key); //對hash值的高位和segmentMask掩碼做按位與,確定key所在的segment(segmentMask=segment的長度-1) int j = (hash >>> segmentShift) & segmentMask; //通過Unsafe類獲取segments陣列中下標為j的元素,如果不存在就初始化segment。(SSHIFT和SBASE均為確定陣列元素的記憶體位置,見以下變數宣告和static塊初始化) if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long SBASE; private static final int SSHIFT; private static final long TBASE; private static final int TSHIFT; private static final long HASHSEED_OFFSET; private static final long SEGSHIFT_OFFSET; private static final long SEGMASK_OFFSET; private static final long SEGMENTS_OFFSET; static { int ss, ts; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[].class; Class sc = Segment[].class; TBASE = UNSAFE.arrayBaseOffset(tc); SBASE = UNSAFE.arrayBaseOffset(sc); ts = UNSAFE.arrayIndexScale(tc); ss = UNSAFE.arrayIndexScale(sc); HASHSEED_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("hashSeed")); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentShift")); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentMask")); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segments")); } catch (Exception e) { throw new Error(e); } if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0) throw new Error("data type scale not a power of two"); SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); TSHIFT = 31 - Integer.numberOfLeadingZeros(ts); }
ConcurrentHashMap的ensureSegment方法,獲取下標為 j 的segment元素方法,如果不存在則初始化。
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; //獲取k的位置偏移量 long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; //驗證ss陣列中u處的元素是否為null if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //獲取下標為0處的segment,目的是以第一個segment為模板,建立segment元素 Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; //再次驗證sh陣列中偏移量為u的元素是否為null if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck //建立segment元素 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); //利用unsafe類,迴圈原子的去設定u處的segment的值為新segment元素,設定成功則返回,如果此時有其他執行緒已經建立了u處的segment元素,則也返回。 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
通過ConcurrentHashMap的put方法發現,定位到segment後,主要呼叫segment的put方法來操作,以下是segment的put(K key, int hash, V value, boolean onlyIfAbsent)方法原始碼。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
//走到這裡說明,已經獲取到了segment上的鎖。以下的操作均是在加鎖的情況下進行。
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
//獲取index處最新的頭結點
HashEntry<K,V> first = entryAt(tab, index);
//獲取到index處的HashEntry頭節點後,迴圈去查詢Key
for (HashEntry<K,V> e = first;;) {
//查詢Key是否已經存在,如果存在則用新值替換舊值。
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
//走到這裡,說明連結串列中不存在此Key。
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//擴容,重新hash
rehash(node);
else
//將新結點插入到Index處,內部使用unsafe類操作。
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
從以上原始碼可以看出,segment的put方法首先去獲取此segment上的鎖。然後獲取index處最新的頭結點,為什麼不直接tab[index]取出頭結點呢,因為可能有其他執行緒修改過此index處的entry連結串列。對index處的連結串列迴圈,包含兩部分,第一個if中說明如果key已經在此連結串列中存在了,則用新value替換oldValue,返回oldValue;進入第二個if說明在連結串列中沒有找到對應的key值,則將新節點插入到頭結點之前,新節點當做頭結點,使用unsafe類更新table中的entry值。最後finally中,當key,value值設定完畢,直接解鎖。
為什麼第一個if中直接用e.value = value操作來更新,而第二個if中不用tab[index]=node來更新呢? 這需要看HashEntry節點的組成。jdk1.7中HashEntry節點原始碼如下,value是volatile變數,當value改變時,其他執行緒可以獲取到最新value,而HashEntry不是volatile修飾的,直接賦值其他執行緒不能看到最新的value值,所以需要藉助unsafe類來操作。
//HashEntry類
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
從put操作的分析可以知道,ConcurrentHashMap的讀操作為什麼不需要加鎖了。在put操作時,無論是替換key的value值還是新增key value值,都能保證將最新值更新到主記憶體中。
在segment的put方法中,還有兩個方法,一個是獲取鎖時的tryLock方法和scanAndLockForPut方法,我們繼續看下這兩個方法。tryLock直接呼叫ReentrantLock的tryLock方法,此方法成功更新AQS中的同步狀態則表明獲取到鎖,返回true,否則返回false。具體可以參考這裡。
如果為false,則呼叫segment的scanAndLockForPut方法,此方法原始碼如下。
//segment的scanAndLockForPut方法
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//根據segment和hash值獲取對應key的HashEntry(頭結點)。
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
//第一個if確定key是否在此entry連結串列中存在,不存在則建立新節點。存在時,開始retry。
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//第二個if判斷retry是否停止,阻塞等待獲取鎖。
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
//retry次數為偶數時,判斷頭結點是否有變化,有變化則變數歸位,重新迴圈。
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
scanAndLockForPut方法主要功能是獲取segment的鎖,返回值:key在segment中不存在則返回新節點,存在則返回null。此方法並沒有直接呼叫lock阻塞等待,而是在等待鎖可用的同時建立對應的節點。此方法會在兩種情況下返回:一是tryLock獲取到鎖,二是retry達到一定次數後,lock阻塞等待獲取到鎖。while迴圈中有3個if,第一個if(retries < 0) 是為了確定key是否在segment中存在,如果不存在則建立新的節點,存在時,開始retry,並沒有返回找到的節點(因為在外層會再次定位key,此時返回的可能是過期值)。第一個if確定後,第二個if是重試次數++,超過一定次數後,直接阻塞等待;第三個if是如果重試次數是偶數,則重新去記憶體中取最新的頭結點,如果有變化(有其他執行緒插入了節點),則變數歸位,重新迴圈。
其中,entryForHash方法是根據segment和hash值獲取對應key的HashEntry。
2、以上分析了ConcurrentHashMap的put原始碼,現在再來看get原始碼。找到key對應的值,返回value,找不到返回null。get操作沒有加鎖,利用unsafe類和volatile修飾符來獲取最新的值(從主記憶體獲取那一刻是最新的,保證不是髒資料)。
get操作第一步取key的hash值,然後定位所在segment,並用unsafe類取最新的segment,最後定位到Key所在的HashEntry頭結點,迴圈連結串列獲取key對應的value值。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
參考: