Java多執行緒(六)之Deque與LinkedBlockingDeque深入分析
1、LinkedBlockingDeque資料結構
雙向併發阻塞佇列。所謂雙向是指可以從佇列的頭和尾同時操作,併發只是執行緒安全的實現,阻塞允許在入隊出隊不滿足條件時掛起執行緒,這裡說的佇列是指支援FIFO/FILO實現的連結串列。
首先看下LinkedBlockingDeque的資料結構。通常情況下從資料結構上就能看出這種實現的優缺點,這樣就知道如何更好的使用工具了。
從資料結構和功能需求上可以得到以下結論:
- 要想支援阻塞功能,佇列的容量一定是固定的,否則無法在入隊的時候掛起執行緒。也就是capacity是final型別的。
- 既然是雙向連結串列,每一個結點就需要前後兩個引用,這樣才能將所有元素串聯起來,支援雙向遍歷。也即需要prev/next兩個引用。
- 雙向連結串列需要頭尾同時操作,所以需要first/last兩個節點,當然可以參考LinkedList那樣採用一個節點的雙向來完成,那樣實現起來就稍微麻煩點。
- 既然要支援阻塞功能,就需要鎖和條件變數來掛起執行緒。這裡使用一個鎖兩個條件變數來完成此功能。
2、LinkedBlockingDeque原始碼分析
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { /** 包含前驅和後繼節點的雙向鏈式結構 */ static final class Node<E> { E item; Node<E> prev; Node<E> next; Node(E x, Node<E> p, Node<E> n) { item = x; prev = p; next = n; } } /** 頭節點 */ private transient Node<E> first; /** 尾節點 */ private transient Node<E> last; /** 元素個數*/ private transient int count; /** 佇列容量 */ private final int capacity; /** 鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** notEmpty條件 */ private final Condition notEmpty = lock.newCondition(); /** notFull條件 */ private final Condition notFull = lock.newCondition(); /** 構造方法 */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); for (E e : c) add(e); } /** * 新增元素作為新的頭節點 */ private boolean linkFirst(E e) { if (count >= capacity) return false; ++count; Node<E> f = first; Node<E> x = new Node<E>(e, null, f); first = x; if (last == null) last = x; else f.prev = x; notEmpty.signal(); return true; } /** * 新增尾元素 */ private boolean linkLast(E e) { if (count >= capacity) return false; ++count; Node<E> l = last; Node<E> x = new Node<E>(e, l, null); last = x; if (first == null) first = x; else l.next = x; notEmpty.signal(); return true; } /** * 返回並移除頭節點 */ private E unlinkFirst() { Node<E> f = first; if (f == null) return null; Node<E> n = f.next; first = n; if (n == null) last = null; else n.prev = null; --count; notFull.signal(); return f.item; } /** * 返回並移除尾節點 */ private E unlinkLast() { Node<E> l = last; if (l == null) return null; Node<E> p = l.prev; last = p; if (p == null) first = null; else p.next = null; --count; notFull.signal(); return l.item; } /** * 移除節點x */ private void unlink(Node<E> x) { Node<E> p = x.prev; Node<E> n = x.next; if (p == null) {//x是頭的情況 if (n == null) first = last = null; else { n.prev = null; first = n; } } else if (n == null) {//x是尾的情況 p.next = null; last = p; } else {//x是中間的情況 p.next = n; n.prev = p; } --count; notFull.signalAll(); } //--------------------------------- BlockingDeque 雙端阻塞佇列方法實現 public void addFirst(E e) { if (!offerFirst(e)) throw new IllegalStateException("Deque full"); } public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); } public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); lock.lock(); try { return linkFirst(e); } finally { lock.unlock(); } } public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); lock.lock(); try { return linkLast(e); } finally { lock.unlock(); } } public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); lock.lock(); try { while (!linkFirst(e)) notFull.await(); } finally { lock.unlock(); } } public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); lock.lock(); try { while (!linkLast(e)) notFull.await(); } finally { lock.unlock(); } } public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); lock.lockInterruptibly(); try { for (;;) { if (linkFirst(e)) return true; if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } } finally { lock.unlock(); } } public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); lock.lockInterruptibly(); try { for (;;) { if (linkLast(e)) return true; if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } } finally { lock.unlock(); } } public E removeFirst() { E x = pollFirst(); if (x == null) throw new NoSuchElementException(); return x; } public E removeLast() { E x = pollLast(); if (x == null) throw new NoSuchElementException(); return x; } public E pollFirst() { lock.lock(); try { return unlinkFirst(); } finally { lock.unlock(); } } public E pollLast() { lock.lock(); try { return unlinkLast(); } finally { lock.unlock(); } } public E takeFirst() throws InterruptedException { lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } public E takeLast() throws InterruptedException { lock.lock(); try { E x; while ( (x = unlinkLast()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); lock.lockInterruptibly(); try { for (;;) { E x = unlinkFirst(); if (x != null) return x; if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } } finally { lock.unlock(); } } public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); lock.lockInterruptibly(); try { for (;;) { E x = unlinkLast(); if (x != null) return x; if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } } finally { lock.unlock(); } } public E getFirst() { E x = peekFirst(); if (x == null) throw new NoSuchElementException(); return x; } public E getLast() { E x = peekLast(); if (x == null) throw new NoSuchElementException(); return x; } public E peekFirst() { lock.lock(); try { return (first == null) ? null : first.item; } finally { lock.unlock(); } } public E peekLast() { lock.lock(); try { return (last == null) ? null : last.item; } finally { lock.unlock(); } } public boolean removeFirstOccurrence(Object o) { if (o == null) return false; lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } public boolean removeLastOccurrence(Object o) { if (o == null) return false; lock.lock(); try { for (Node<E> p = last; p != null; p = p.prev) { if (o.equals(p.item)) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } //---------------------------------- BlockingQueue阻塞佇列 方法實現 public boolean add(E e) { addLast(e); return true; } public boolean offer(E e) { return offerLast(e); } public void put(E e) throws InterruptedException { putLast(e); } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { return offerLast(e, timeout, unit); } public E remove() { return removeFirst(); } public E poll() { return pollFirst(); } public E take() throws InterruptedException { return takeFirst(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { return pollFirst(timeout, unit); } public E element() { return getFirst(); } public E peek() { return peekFirst(); } //------------------------------------------- Stack 方法實現 public void push(E e) { addFirst(e); } public E pop() { return removeFirst(); } //------------------------------------------- Collection 方法實現 public boolean remove(Object o) { return removeFirstOccurrence(o); } public int size() { lock.lock(); try { return count; } finally { lock.unlock(); } } public boolean contains(Object o) { if (o == null) return false; lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { lock.unlock(); } } boolean removeNode(Node<E> e) { lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) { if (p == e) { unlink(p); return true; } } return false; } finally { lock.unlock(); } } …… }
3、LinkedBlockingDeque的優缺點
有了上面的結論再來研究LinkedBlockingDeque的優缺點。
優點當然是功能足夠強大,同時由於採用一個獨佔鎖,因此實現起來也比較簡單。所有對佇列的操作都加鎖就可以完成。同時獨佔鎖也能夠很好的支援雙向阻塞的特性。
凡事有利必有弊。缺點就是由於獨佔鎖,所以不能同時進行兩個操作,這樣效能上就大打折扣。從效能的角度講LinkedBlockingDeque要比LinkedBlockingQueue要低很多,比CocurrentLinkedQueue就低更多了,這在高併發情況下就比較明顯了。
前面分析足夠多的Queue實現後,LinkedBlockingDeque的原理和實現就不值得一提了,無非是在獨佔鎖下對一個連結串列的普通操作。
4、LinkedBlockingDeque的序列化、反序列化
有趣的是此類支援序列化,但是Node並不支援序列化,因此fist/last就不能序列化,那麼如何完成序列化/反序列化過程呢?
清單4 LinkedBlockingDeque的序列化、反序列化
private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { lock.lock(); try { // Write out capacity and any hidden stuff s.defaultWriteObject(); // Write out all elements in the proper order. for (Node<E> p = first; p != null; p = p.next) s.writeObject(p.item); // Use trailing null as sentinel s.writeObject(null); } finally { lock.unlock(); } } private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); count = 0; first = null; last = null; // Read in all elements and place in queue for (;;) { E item = (E)s.readObject(); if (item == null) break; add(item); } }
清單4 描述的是LinkedBlockingDeque序列化/反序列化的過程。序列化時將真正的元素寫入輸出流,最後還寫入了一個null。讀取的時候將所有物件列表讀出來,如果讀取到一個null就表示結束。這就是為什麼寫入的時候寫入一個null的原因,因為沒有將count寫入流,所以就靠null來表示結束,省一個整數空間。