1. 程式人生 > >Java並發編程:淺析幾種線程安全模型 [轉]

Java並發編程:淺析幾種線程安全模型 [轉]

condition 神器 ans temp 簡單 nts 替換 line get()

多線程編程一直是老生常談的問題,在Java中,隨著JDK的逐漸發展,JDK提供給我們的並發模型也越來越多,本文摘取三例使用不同原理的模型,分析其大致原理。目錄如下:

1.COW之CopyOnWriteArrayList

2.CAS之ConcurrentHashMap

3.讀寫分離之LinkedBlockingQueue

COW之CopyOnWriteArrayList

cow是copy-on-write的簡寫,這種模型來源於linux系統fork命令,Java中一種使用cow模型來實現的並發類是CopyOnWriteArrayList。相比於Vector,它的讀操作是無需加鎖的:

1 2 3 public E get(int index) { return (E) elements[index]; }

之所以有如此神奇功效,其采取的是空間換取時間的方法,查看其add方法:

1 2 3 4 5 6 7 public synchronized boolean add(E e) { Object[] newElements = new Object[elements.length + 1]; System.arraycopy(elements, 0, newElements, 0, elements.length);
newElements[elements.length] = e; elements = newElements; return true; }

我們註意到,CopyOnWriteArrayList的add方法是需要加鎖的,但其內部並沒有直接對elements數組做操作,而是先copy一份當前的數據到一個新的數組,然後對新的數組進行賦值操作。這樣做就讓get操作從同步中解脫出來。因為更改的數據並沒有發生在get所需的數組中。而是放生在新生成的副本中,所以不需要同步。但應該註意的是,盡管如此,get操作還是可能會讀取到臟數據的。

CopyOnWriteArrayList的另一特點是允許多線程遍歷,且其它線程更改數據並不會導致遍歷線程拋出ConcurrentModificationException

異常,來看下iterator()

1 2 3 4 public Iterator<E> iterator() { Object[] snapshot = elements; return new CowIterator<E>(snapshot, 0, snapshot.length); }

這個CowIterator 是 ListIterator的子類,這個Iterator的特點是它並不支持對數據的更改操作:

1 2 3 4 5 6 7 8 9 public void add(E object) { throw new UnsupportedOperationException(); } public void remove() { throw new UnsupportedOperationException(); } public void set(E object) { throw new UnsupportedOperationException(); }

這樣做的原因也很容易理解,我們可以簡單地的認為CowIterator中的snapshot是不可變數組,因為list中有數據更新都會生成新數組,而不會改變snapshot, 所以此時Iterator沒辦法再將更改的數據寫回list了。同理,list數據有更新也不會反映在CowIterator中。CowIterator只是保證其叠代過程不會發生異常。

CAS之ConcurrentHashMap(JDK1.8)

CAS是Compare and Swap的簡寫,即比較與替換,CAS造作將比較和替換封裝為一組原子操作,不會被外部打斷。這種原子操作的保證往往由處理器層面提供支持。

在Java中有一個非常神奇的Unsafe類來對CAS提供語言層面的接口。但類如其名,此等神器如果使用不當,會造成武功盡失的,所以Unsafe不對外開放,想使用的話需要通過反射等技巧。這裏不對其做展開。介紹它的原因是因為它是JDK1.8中ConcurrentHashMap的實現基礎。

ConcurrentHashMapHashMap對數據的存儲有著相似的地方,都采用數組+鏈表+紅黑樹的方式。基本邏輯是內部使用Node來保存map中的一項key, value結構,對於hash不沖突的key,使用數組來保存Node數據,而每一項Node都是一個鏈表,用來保存hash沖突的Node,當鏈表的大小達到一定程度會轉為紅黑樹,這樣會使在沖突數據較多時也會有比較好的查詢效率。

了解了ConcurrentHashMap的存儲結構後,我們來看下在這種結構下,ConcurrentHashMap是如何實現高效的並發操作,這得益於ConcurrentHashMap中的如下三個函數。

1 2 3 4 5 6 7 8 9 10 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putOrderedObject(tab, ((long)i << ASHIFT) + ABASE, v); }

其中的U就是我們前文提到的Unsafe的一個實例,這三個函數都通過Unsafe的幾個方法保證了是原子性:

  • tabAt作用是返回tab數組第i項
  • casTabAt函數是對比tab第i項是否與c相等,相等的話將其設置為v。
  • setTabAt將tab的第i項設置為v

有了這三個函數就可以保證ConcurrentHashMap的線程安全嗎?並不是的,ConcurrentHashMap內部也使用比較多的synchronized,不過與HashTable這種對所有操作都使用synchronized不同,ConcurrentHashMap只在特定的情況下使用synchronized,來較少鎖的定的區域。來看下putVal方法(精簡版):

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to embin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { .... } } } addCount(1L, binCount); return null; }

整個put流程大致如下:

  • 判斷key與value是否為空,為空拋異常
  • 計算kek的hash值,然後進入死循環,一般來講,caw算法與死循環是搭檔。
  • 判斷table是否初始化,未初始化進行初始化操作
  • Node在table中的目標位置是否為空,為空的話使用caw操作進行賦值,當然,這種賦值是有可能失敗的,所以前面的死循環發揮了重試的作用。
  • 如果當前正在擴容,則嘗試協助其擴容,死循環再次發揮了重試的作用,有趣的是ConcurrentHashMap是可以多線程同時擴容的。這裏說協助的原因在於,對於數組擴容,一般分為兩步:1.新建一個更大的數組;2.將原數組數據copy到新數組中。對於第一步,ConcurrentHashMap通過CAW來控制一個int變量保證新建數組這一步只會執行一次。對於第二步,ConcurrentHashMap采用CAW + synchronized + 移動後標記 的方式來達到多線程擴容的目的。感興趣可以查看transfer函數。
  • 最後的一個else分支,黑科技的流程已嘗試無效,目標Node已經存在值,只能鎖住當前Node來進行put操作,當然,這裏省略了很多代碼,包括鏈表轉紅黑樹的操作等等。

相比於put,get的代碼更好理解一下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
  • 檢查表是否為空
  • 獲取key的hash h,獲取key在table中對應的Node e
  • 判斷Node e的第一項是否與預期的Node相等,相等話, 則返回e.val
  • 如果e.hash < 0, 說明e為紅黑樹,調用e的find接口來進行查找。
  • 走到這一步,e為鏈表無疑,且第一項不是需要查詢的數據,一直調用next來進行查找即可。

讀寫分離之LinkedBlockingQueue

還有一種實現線程安全的方式是通過將讀寫進行分離,這種方式的一種實現是LinkedBlockingQueueLinkedBlockingQueue整體設計的也十分精巧,它的全局變量分為三類:

  • final 型
  • Atomic 型
  • 普通變量

final型變量由於聲明後就不會被修改,所以自然線程安全,Atomic型內部采用了cas模型來保證線程安全。對於普通型變量,LinkedBlockingQueue中只包含head與last兩個表示隊列的頭與尾。並且私有,外部無法更改,所以,LinkedBlockingQueue只需要保證head與last的安全即可保證真個隊列的線程安全。並且LinkedBlockingQueue屬於FIFO型隊列,一般情況下,讀寫會在不同元素上工作,所以, LinkedBlockingQueue定義了兩個可重入鎖,巧妙的通過對head與last分別加鎖,實現讀寫分離,來實現良好的安全並發特性:

1 2 3 4 5 6 7 8 /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();

首先看下它的offer 方法:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }

可見,在對隊列進行添加元素時,只需要對putLock進行加鎖即可,保證同一時刻只有一個線程可以對last進行插入。同樣的,在從隊列進行提取元素時,也只需要獲取takeLock鎖來對head操作即可:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
LinkedBlockingQueue整體還是比較好理解的,但有幾個點需要特殊註意:
  • LinkedBlockingQueue是一個阻塞隊列,當隊列無元素為空時,所有取元素的線程會通過notEmpty 的await()方法進行等待,直到再次有數據enqueue時,notEmpty發出signal信號。對於隊列達到上限時也是同理。
  • 對於remove,contains,toArray, toString, clear之類方法,會調用fullyLock方法,來同時獲取讀寫鎖。但對於size方法,由於隊列內部維護了AtomicInteger類型的count變量,是不需要加鎖進行獲取的。

本文轉自:http://www.importnew.com/27922.html

Java並發編程:淺析幾種線程安全模型 [轉]