Java集合框架閱讀筆記(三)ConcurrentHashMap
阿新 • • 發佈:2018-08-12
類繼承 only d+ nan next related ati null lur
預備知識
- AQS(AbstractQueuedSynchronizer):提供了一個框架用來構造同步一些工具類比如ReentrantLock、 CopyOnWriteArrayList、 CountDownLatch、Semaphore等。通過維護一個volatile修飾的變量state表示鎖的獲取情況和一個FIFO隊列用來存儲線程。
API:
Provides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues. This class is designed to
be a useful basis for most kinds of synchronizers that rely on a
single atomic {@code int} value to represent state.
ConcurrentHashMap
介紹
- ConcurrentHashMap是一個線程安全的HashMap,繼承體系如下
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable
JDK8和JDK7的區別
存儲結構
- JDK8存儲結構采用Node類型數組加鏈表並通過拉鏈法解決hash沖突而JDK7基於AQS實現了一個Segment類繼承自ReentrantLock,每個Segment對象持有一個HashEntry數組。Node和HashEntry都是實現了Map.Entry接口,只不過名字不同。
並發更新操作的實現
- JDK8通過synchronized+CAS機制進行並發更新,鎖對象是數組下標對應的元素持有的Monitor。JDK7繼承了AQS裏的ReentrantLock進行加鎖實現的並發更新。
計算size
- JDK8通過維護一個由volatile修飾的baseCount變量進行計數,以及一個CounterCell類進行記錄變化的次數來確定size。JDK7采用延遲計算,在計算過程中會對每個Segment計算至少兩次,如果出現數據不一致現象就進行全部加鎖最後求得size。
部分源碼分析
public V put(K key, V value)
直接調用
putVal(K key, V value, boolean onlyIfAbsent)
public V put(K key, V value) {
return putVal(key, value, false);
}
- JDK8
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 預處理
if (key == null || value == null) throw new NullPointerException();
// 為了分布均勻對key的hashcode再hash
int hash = spread(key.hashCode());
// 根據binCount通過計算鏈表長度,判斷鏈表是否修改過,如果鏈表的數量超過設定的閾值,插入完成之後需要變成紅黑樹
int binCount = 0;
// 初始化tab,死循環直到插入成功
for (Node<K,V>[] tab = table;;) {
// 指向目標節點
Node<K,V> f;
int n, i, fh;
// 延遲加載:如果沒有初始化進行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果數組下標對應的元素為null,通過CAS放到當前數組下標處
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果當前節點的hash值為MOVED常量,則表示這個ConcurrentHashMap正在進行擴容,則當前線程幫助擴容helpTransfer(tab,f)。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 當前數組下標有元素,並且可以正常插入,則根據節點元素類型是紅黑樹節點還是鏈表節點然後執行插入
else {
V oldVal = null;
// 加鎖,Monitor對象是當前數組下標元素持有的Monitor
synchronized (f) {
// tabAt(tab,i)會通過Unsafe類求出數組下標元素,再次判斷確保是否是上面所確定的f
if (tabAt(tab, i) == f) {
// 代表是鏈表元素,執行插入鏈表操作
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果已經存在此key,則更新即可
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 不存在,則插入到鏈表尾部
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 當前數組下標元素類型是紅黑樹,執行紅黑樹插入操作
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 根據binCount判斷鏈表元素個數
if (binCount != 0) {
// 超過閾值,則將鏈表變成紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 將當前維護的baseCount + 1
addCount(1L, binCount);
return null;
}
- JDK7
public V put(K key, V value) {
Segment<K, V> s;
if (value == null) throw new NullPointerException();
int hash = hash(key);
// UNSAFE通過hash找到Segment
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K, V>) UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 加鎖,如果加鎖成功則執行後續操作,沒有競爭到鎖就需要自旋直到獲取到鎖並執行插入操作。
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 找到引用的數組
HashEntry<K,V>[] tab = table;
// 求模計算數組下標
int index = (tab.length - 1) & hash;
// 找到數組第一個元素
HashEntry<K,V> first = entryAt(tab, index);
// 死循環,直到插入成功
for (HashEntry<K,V> e = first;;) {
// 數組對應下標已經存在數值。
if (e != null) {
K k;
// 已經存在此key,進行更新
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
// 記錄每一個Segment中的HashEntry數組中元素的數量
++modCount;
}
break;
}
e = e.next;
// 如果當前數組下標已經存在元素
} else {
// 當前線程可能自旋過(自旋過程可能會返回一個已經初始化的node),所以需要判斷node是否為null
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 判斷是否需要擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 釋放鎖
unlock();
}
return oldValue;
}
public V get(Object key)
- JDK8
public V get(Object key) {
Node<K,V>[] tab;
Node<K,V> e, p;
int n, eh;
K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 在紅黑樹中查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 在鏈表中查找
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- JDK7
public V get(Object key) {
Segment<K, V> s; // manually integrate access methods to reduce overhead
HashEntry<K, V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 通過UNSAFE獲取Segment,通過Segment獲取HashEntry數組引用,進而找到指定的key
if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
public int size()
- JDK8
通過由
volatile
修飾的baseCount
變量以及CounterCell
對象記錄變化的次數求出size()
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
// 獲取baseCount
long sum = baseCount;
if (as != null) {
// 循環遍歷每一個CountCell中的value
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
- JDK7
通過計算Segment數組中每一個Segment中的modcount(在put時會++modCount)求出sum。但是不止求一次sum,會至少求兩次(由
RETRIES_BEFORE_LOCK
決定的),如果兩次求出的sum不一致,下次求就會將Segment數組全部加鎖,重新求一次。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K, V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn‘t retry
try {
for (; ; ) {
// 超出兩次,將Segment全部加鎖
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K, V> seg = segmentAt(segments, j);
if (seg != null) {
// 獲取Segment對象的modCount
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
// 和上一次求的結果比較
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
參考
- https://www.jianshu.com/p/e694f1e868ec
- http://penghb.com/2017/10/27/java/concurrentHashMap/
- http://www.importnew.com/26049.html
- https://swenfang.github.io/2018/06/03/Java%208%20ConcurrentHashMap%20%E6%BA%90%E7%A0%81%E8%A7%A3%E8%AF%BB/
Java集合框架閱讀筆記(三)ConcurrentHashMap