Java語言中的無鎖化執行緒安全佇列
在併發程式設計中,有時候需要使用執行緒安全的佇列。對於執行緒安全的佇列,有兩種實現方式:一種是阻塞(加鎖),一種是非阻塞(無鎖)。對於無鎖化執行緒安全佇列,實現要基於兩個方面:原子性操作和記憶體訪問控制。說的淺顯一些,就是在JDK中,需要用到Unsafe類中的CAS操作,結合volatile關鍵字,來實現無鎖化執行緒安全佇列。具有代表性的就是ConcurrentLinkedQueue,當然還有其他的佇列,諸如ConcurrentSkipListSet、ConcurrentSkipListMap。下面就著重說一下ConcurrentLinkedQueue的實現。
ConcurrentLinkedQueue是一個基於連結串列的無界執行緒安全佇列,它採用先進先出的規則對節點進行排序,當我們新增一個元素的時候,它會新增到佇列的尾部,當我們獲取一個元素的時候,它會返回頭部的元素。這是採用“wait-free”演算法(CAS)來實現的。
原始碼中的CAS操作和volatile關鍵字:
ConcurrentLinkedQueue的類圖如下:
從類圖中可以看出,ConcurrentLinkedQueue是由head節點和tail節點組成,每個節點(Node)由節點元素和指向下一個節點的引用(next)組成,節點與節點之間就是通過這個next連線在一起,從而形成一張連結串列結構的佇列。
要了解ConcurrentLinkedQueue是無鎖的執行緒安全佇列,要從入隊和出隊來分析。
入隊:
入隊就是將入隊的節點新增到佇列的尾部,ConcurrentLinkedQueue的入隊操作是方法offer(E e),該方法原始碼如下:
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }
從原始碼中我們可以看出,入隊過程主要做兩件事情:第一件事情就是定位出尾節點,第二件事情就是使用CAS演算法將入隊節點設定成尾節點的next節點,如不成功則重試。其中 casTail(t, newNode)方法就是CAS操作:
出隊:
出佇列就是從佇列中返回一個節點元素,並且清空該節點對元素的引用。ConcurrentLinkedQueue的出隊操作是方法poll(),該方法原始碼如下:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
從原始碼中可以看出,出隊操作首先要獲取頭節點的元素,判斷該元素是否為空,如果為空,則說明另外一個執行緒已經進行了一次出隊操作並且取走該元素,如果不為空,則使用CAS的方式將頭節點的引用設定為null,如果CAS成功,則返回頭節點的元素,否則表示另外一個執行緒先於本執行緒做了CAS,導致該佇列發生了變化,需要重新獲取頭節點。其中casItem(item, null)方法和updateHead(h, ((q = p.next) != null) ? q : p)都屬於CAS操作。