1. 程式人生 > >Java並發-從同步容器到並發容器

Java並發-從同步容器到並發容器

最後一個元素 vpd key 指向 map容器 部分 bject 表頭 技術分享

引言

容器是Java基礎類庫中使用頻率最高的一部分,Java集合包中提供了大量的容器類來幫組我們簡化開發,我前面的文章中對Java集合包中的關鍵容器進行過一個系列的分析,但這些集合類都是非線程安全的,即在多線程的環境下,都需要其他額外的手段來保證數據的正確性,最簡單的就是通過synchronized關鍵字將所有使用到非線程安全的容器代碼全部同步執行。這種方式雖然可以達到線程安全的目的,但存在幾個明顯的問題:首先編碼上存在一定的復雜性,相關的代碼段都需要添加鎖。其次這種一刀切的做法在高並發情況下性能並不理想,基本相當於串行執行。JDK1.5中為我們提供了一系列的並發容器,集中在java.util.concurrent包下,用來解決這兩個問題,先從同步容器說起。

同步容器Vector和HashTable

為了簡化代碼開發的過程,早期的JDK在java.util包中提供了Vector和HashTable兩個同步容器,這兩個容器的實現和早期的ArrayList和HashMap代碼實現基本一樣,不同在於Vector和HashTable在每個方法上都添加了synchronized關鍵字來保證同一個實例同時只有一個線程能訪問,部分源碼如下:

//Vector
public synchronized int size() {};
public synchronized E get(int index) {};
//HashTable
public synchronized V put(K key, V value) {};

public synchronized V remove(Object key) {};
通過對每個方法添加synchronized,保證了多次操作的串行。這種方式雖然使用起來方便了,但並沒有解決高並發下的性能問題,與手動鎖住ArrayList和HashMap並沒有什麽區別,不論讀還是寫都會鎖住整個容器。其次這種方式存在另一個問題:當多個線程進行復合操作時,是線程不安全的。可以通過下面的代碼來說明這個問題:

public static void deleteVector(){
int index = vectors.size() - 1;
vectors.remove(index);
}
代碼中對Vector進行了兩步操作,首先獲取size,然後移除最後一個元素,多線程情況下如果兩個線程交叉執行,A線程調用size後,B線程移除最後一個元素,這時A線程繼續remove將會拋出索引超出的錯誤。

那麽怎麽解決這個問題呢?最直接的修改方案就是對代碼塊加鎖來防止多線程同時執行:

public static void deleteVector(){
synchronized (vectors) {
int index = vectors.size() - 1;
vectors.remove(index);
}
}
如果上面的問題通過加鎖來解決沒有太直觀的影響,那麽來看看對vectors進行叠代的情況:

public static void foreachVector(){
synchronized (vectors) {
for (int i = 0; i < vectors.size(); i++) {
System.out.println(vectors.get(i).toString());
}
}
}
為了避免多線程情況下在叠代的過程中其他線程對vectors進行了修改,就不得不對整個叠代過程加鎖,想象這麽一個場景,如果叠代操作非常頻繁,或者vectors元素很大,那麽所有的修改和讀取操作將不得不在鎖外等待,這將會對多線程性能造成極大的影響。那麽有沒有什麽方式能夠很好的對容器的叠代操作和修改操作進行分離,在修改時不影響容器的叠代操作呢?這就需要java.util.concurrent包中的各種並發容器了出場了。

並發容器CopyOnWrite

CopyOnWrite--寫時復制容器是一種常用的並發容器,它通過多線程下讀寫分離來達到提高並發性能的目的,和前面我們講解StampedLock時所用的解決方案類似:任何時候都可以進行讀操作,寫操作則需要加鎖。不同的是,在CopyOnWrite中,對容器的修改操作加鎖後,通過copy一個新的容器來進行修改,修改完畢後將容器替換為新的容器即可。

這種方式的好處顯而易見:通過copy一個新的容器來進行修改,這樣讀操作就不需要加鎖,可以並發讀,因為在讀的過程中是采用的舊的容器,即使新容器做了修改對舊容器也沒有影響,同時也很好的解決了叠代過程中其他線程修改導致的並發問題。

JDK中提供的並發容器包括CopyOnWriteArrayList和CopyOnWriteArraySet,下面通過CopyOnWriteArrayList的部分源碼來理解這種思想:

//添加元素
public boolean add(E e) {
//獨占鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//復制一個新的數組newElements
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
//修改後指向新的數組
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
public E get(int index) {
//未加鎖,直接獲取
return get(getArray(), index);
}
代碼很簡單,在add操作中通過一個共享的ReentrantLock來獲取鎖,這樣可以防止多線程下多個線程同時修改容器內容。獲取鎖後通過Arrays.copyOf復制了一個新的容器,然後對新的容器進行了修改,最後直接通過setArray將原數組引用指向了新的數組,避免了在修改過程中叠代數據出現錯誤。get操作由於是讀操作,未加鎖,直接讀取就行。CopyOnWriteArraySet類似,這裏不做過多講解。

CopyOnWrite容器雖然在多線程下使用是安全的,相比較Vector也大大提高了讀寫的性能,但它也有自身的問題。

首先就是性能,在講解ArrayList的文章中提到過,ArrayList的擴容由於使用了Arrays.copyOf每次都需要申請更大的空間以及復制現有的元素到新的數組,對性能存在一定影響。CopyOnWrite容器也不例外,每次修改操作都會申請新的數組空間,然後進行替換。所以在高並發頻繁修改容器的情況下,會不斷申請新的空間,同時會造成頻繁的GC,這時使用CopyOnWrite容器並不是一個好的選擇。

其次還有一個數據一致性問題,由於在修改中copy了新的數組進行替換,同時舊數組如果還在被使用,那麽新的數據就不能被及時讀取到,這樣就造成了數據不一致,如果需要強數據一致性,CopyOnWrite容器也不太適合。

並發容器ConcurrentHashMap

ConcurrentHashMap容器相較於CopyOnWrite容器在並發加鎖粒度上有了更大一步的優化,它通過修改對單個hash桶元素加鎖的達到了更細粒度的並發控制。在了解ConcurrentHashMap容器之前,推薦大家先閱讀我之前對HashMap源碼分析的文章--Java集合(5)一 HashMap與HashSet,因為在底層數據結構上,ConcurrentHashMap和HashMap都使用了數組+鏈表+紅黑樹的方式,只是在HashMap的基礎上添加了並發相關的一些控制,所以這裏只對ConcurrentHashMap中並發相關代碼做一些分析。
技術分享圖片
還是先從ConcurrentHashMap的寫操作開始,這裏就是put方法:

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //計算桶的hash值
int binCount = 0;
//循環插入元素,避免並發插入失敗
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果當前桶無元素,則通過cas操作插入新節點
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
//如果當前桶正在擴容,則協助擴容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//hash沖突時鎖住當前需要添加節點的頭元素,可能是鏈表頭節點或者紅黑樹的根節點
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
在put元素的過程中,有幾個並發處理的關鍵點:

如果當前桶對應的節點還沒有元素插入,通過典型的無鎖cas操作嘗試插入新節點,減少加鎖的概率,並發情況下如果插入不成功,很容易想到自旋,也就是for (Node<K,V>[] tab = table;;)。
如果當前桶正在擴容,則協助擴容((fh = f.hash) == MOVED)。這裏是一個重點,ConcurrentHashMap的擴容和HashMap不一樣,它在多線程情況下或使用多個線程同時擴容,每個線程擴容指定的一部分hash桶,當前線程擴容完指定桶之後會繼續獲取下一個擴容任務,直到擴容全部完成。擴容的大小和HashMap一樣,都是翻倍,這樣可以有效減少移動的元素數量,也就是使用2的冪次方的原因,在HashMap中也一樣。
在發生hash沖突時僅僅只鎖住當前需要添加節點的頭元素即可,可能是鏈表頭節點或者紅黑樹的根節點,其他桶節點都不需要加鎖,大大減小了鎖粒度。
通過ConcurrentHashMap添加元素的過程,知道了ConcurrentHashMap容器是通過CAS + synchronized一起來實現並發控制的。這裏有個額外的問題:為什麽使用synchronized而不使用ReentrantLock?前面我的文章也對synchronized以及ReentrantLock的實現方式和性能做過分析,在這裏我的理解是synchronized在後期優化空間上比ReentrantLock更大。

並發容器ConcurrentSkipListMap

java.util中對應的容器在java.util.concurrent包中基本都可以找到對應的並發容器:List和Set有對應的CopyOnWriteArrayList與CopyOnWriteArraySet,HashMap有對應的ConcurrentHashMap,但是有序的TreeMap或並沒有對應的ConcurrentTreeMap。

為什麽沒有ConcurrentTreeMap呢?這是因為TreeMap內部使用了紅黑樹來實現,紅黑樹是一種自平衡的二叉樹,當樹被修改時,需要重新平衡,重新平衡操作可能會影響樹的大部分節點,如果並發量非常大的情況下,這就需要在許多樹節點上添加互斥鎖,那並發就失去了意義。所以提供了另外一種並發下的有序map實現:ConcurrentSkipListMap。

ConcurrentSkipListMap內部使用跳表(SkipList)這種數據結構來實現,他的結構相對紅黑樹來說非常簡單理解,實現起來也相對簡單,而且在理論上它的查找、插入、刪除時間復雜度都為log(n)。在並發上,ConcurrentSkipListMap采用無鎖的CAS+自旋來控制。

跳表簡單來說就是一個多層的鏈表,底層是一個普通的鏈表,然後逐層減少,通常通過一個簡單的算法實現每一層元素是下一層的元素的二分之一,這樣當搜索元素時從最頂層開始搜索,可以說是另一種形式的二分查找。

一個簡單的獲取跳表層數概率算法實現如下:

int random_level() {
K = 1;
while (random(0,1))
K++;

return K;
}
通過簡單的0和1獲取概率,1層的概率為50%,2層的概率為25%,3層的概率為12.5%,這樣逐級遞減。

一個三層的跳表添加元素的過程如下:
技術分享圖片
插入值為15的節點:
技術分享圖片
插入後:
技術分享圖片
維基百科中有一個添加節點的動圖,這裏也貼出來方便理解:
技術分享圖片
通過分析ConcurrentSkipListMap的put方法來理解跳表以及CAS自旋並發控制:

private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前繼節點
if (n != null) { //查找到前繼節點
Object v; int c;
Node<K,V> f = n.next; //獲取後繼節點的後繼節點
if (n != b.next) //發生競爭,兩次節點獲取不一致,並發導致
break;
if ((v = n.value) == null) { // 節點已經被刪除
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n)
break;
if ((c = cpr(cmp, key, n.key)) > 0) { //進行下一輪查找,比當前key大
b = n;
n = f;
continue;
}
if (c == 0) { //相等時直接cas修改值
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
z = new Node<K,V>(key, value, n); //9. n.key > key > b.key
if (!b.casNext(n, z)) //cas修改值
break; // restart if lost race to append to b
break outer;
}
}
int rnd = ThreadLocalRandom.nextSecondarySeed(); //獲取隨機數
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0) // 獲取跳表層級
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) { //如果獲取的調表層級小於等於當前最大層級,則直接添加,並將它們組成一個上下的鏈表
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level //否則增加一層level,在這裏體現為Index<K,V>數組
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j) //新添加的level層的具體數據
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// 逐層插入數據過程
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}
這裏的插入方法很復雜,可以分為3大步來理解:第一步獲取前繼節點後通過CAS來插入節點;第二步對level層數進行判斷,如果大於最大層數,則插入一層;第三步插入對應層的數據。整個插入過程全部通過CAS自旋的方式保證並發情況下的數據正確性。

總結

JDK中提供了豐富的並發容器供我們使用,文章中介紹的也並不全面,重點是要通過了解各種並發容器的原理,明白他們各自獨特的使用場景。這裏簡單做個總結:當並發讀遠多於修改的場景下需要使用List和Set時,可以考慮使用CopyOnWriteArrayList和CopyOnWriteArraySet;當需要並發使用<Key, Value>鍵值對存取數據時,可以使用ConcurrentHashMap;當要保證並發<Key, Value>鍵值對有序時可以使用ConcurrentSkipListMap。

Java並發-從同步容器到並發容器