1. 程式人生 > >Java多執行緒(六)之Deque與LinkedBlockingDeque深入分析

Java多執行緒(六)之Deque與LinkedBlockingDeque深入分析

1、LinkedBlockingDeque資料結構

雙向併發阻塞佇列。所謂雙向是指可以從佇列的頭和尾同時操作,併發只是執行緒安全的實現,阻塞允許在入隊出隊不滿足條件時掛起執行緒,這裡說的佇列是指支援FIFO/FILO實現的連結串列。

首先看下LinkedBlockingDeque的資料結構。通常情況下從資料結構上就能看出這種實現的優缺點,這樣就知道如何更好的使用工具了。


從資料結構和功能需求上可以得到以下結論:

  1. 要想支援阻塞功能,佇列的容量一定是固定的,否則無法在入隊的時候掛起執行緒。也就是capacity是final型別的。
  2. 既然是雙向連結串列,每一個結點就需要前後兩個引用,這樣才能將所有元素串聯起來,支援雙向遍歷。也即需要prev/next兩個引用。
  3. 雙向連結串列需要頭尾同時操作,所以需要first/last兩個節點,當然可以參考LinkedList那樣採用一個節點的雙向來完成,那樣實現起來就稍微麻煩點。
  4. 既然要支援阻塞功能,就需要鎖和條件變數來掛起執行緒。這裡使用一個鎖兩個條件變數來完成此功能。

2、LinkedBlockingDeque原始碼分析

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>,  java.io.Serializable {
    /** 包含前驅和後繼節點的雙向鏈式結構 */
    static final class Node<E> {
 E item;
        Node<E> prev;
        Node<E> next;
        Node(E x, Node<E> p, Node<E> n) {
            item = x;
            prev = p;
            next = n;
        }
    }
    /** 頭節點 */
    private transient Node<E> first;
    /** 尾節點 */
    private transient Node<E> last;
    /** 元素個數*/
    private transient int count;
    /** 佇列容量 */
    private final int capacity;
    /** 鎖 */
    private final ReentrantLock lock = new ReentrantLock();
    /** notEmpty條件 */
    private final Condition notEmpty = lock.newCondition();
    /** notFull條件 */
    private final Condition notFull = lock.newCondition();
    /** 構造方法 */
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }
    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        for (E e : c)
            add(e);
    }

    /**
     * 新增元素作為新的頭節點
     */
    private boolean linkFirst(E e) {
        if (count >= capacity)
            return false;
        ++count;
        Node<E> f = first;
        Node<E> x = new Node<E>(e, null, f);
        first = x;
        if (last == null)
            last = x;
        else
            f.prev = x;
        notEmpty.signal();
        return true;
    }
    /**
     * 新增尾元素
     */
    private boolean linkLast(E e) {
        if (count >= capacity)
            return false;
        ++count;
        Node<E> l = last;
        Node<E> x = new Node<E>(e, l, null);
        last = x;
        if (first == null)
            first = x;
        else
            l.next = x;
        notEmpty.signal();
        return true;
    }
    /**
     * 返回並移除頭節點
     */
    private E unlinkFirst() {
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        first = n;
        if (n == null)
            last = null;
        else
            n.prev = null;
        --count;
        notFull.signal();
        return f.item;
    }
    /**
     * 返回並移除尾節點
     */
    private E unlinkLast() {
        Node<E> l = last;
        if (l == null)
            return null;
        Node<E> p = l.prev;
        last = p;
        if (p == null)
            first = null;
        else
            p.next = null;
        --count;
        notFull.signal();
        return l.item;
    }
    /**
     * 移除節點x
     */
    private void unlink(Node<E> x) {
        Node<E> p = x.prev;
        Node<E> n = x.next;
        if (p == null) {//x是頭的情況
            if (n == null)
                first = last = null;
            else {
                n.prev = null;
                first = n;
            }
        } else if (n == null) {//x是尾的情況
            p.next = null;
            last = p;
        } else {//x是中間的情況
            p.next = n;
            n.prev = p;
        }
        --count;
        notFull.signalAll();
    }
    //--------------------------------- BlockingDeque 雙端阻塞佇列方法實現
    public void addFirst(E e) {
        if (!offerFirst(e))
            throw new IllegalStateException("Deque full");
    }
    public void addLast(E e) {
        if (!offerLast(e))
            throw new IllegalStateException("Deque full");
    }
    public boolean offerFirst(E e) {
        if (e == null) throw new NullPointerException();
        lock.lock();
        try {
            return linkFirst(e);
        } finally {
            lock.unlock();
        }
    }
    public boolean offerLast(E e) {
        if (e == null) throw new NullPointerException();
        lock.lock();
        try {
            return linkLast(e);
        } finally {
            lock.unlock();
        }
    }
    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        lock.lock();
        try {
            while (!linkFirst(e))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        lock.lock();
        try {
            while (!linkLast(e))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    public boolean offerFirst(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (e == null) throw new NullPointerException();
 long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (linkFirst(e))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
        } finally {
            lock.unlock();
        }
    }
    public boolean offerLast(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (e == null) throw new NullPointerException();
 long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (linkLast(e))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
        } finally {
            lock.unlock();
        }
    }
    public E removeFirst() {
        E x = pollFirst();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    public E removeLast() {
        E x = pollLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    public E pollFirst() {
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }
    public E pollLast() {
        lock.lock();
        try {
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }
    public E takeFirst() throws InterruptedException {
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    public E takeLast() throws InterruptedException {
        lock.lock();
        try {
            E x;
            while ( (x = unlinkLast()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    public E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException {
 long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            for (;;) {
                E x = unlinkFirst();
                if (x != null)
                    return x;
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
        } finally {
            lock.unlock();
        }
    }
    public E pollLast(long timeout, TimeUnit unit)
        throws InterruptedException {
 long nanos = unit.toNanos(timeout);
        lock.lockInterruptibly();
        try {
            for (;;) {
                E x = unlinkLast();
                if (x != null)
                    return x;
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
        } finally {
            lock.unlock();
        }
    }
    public E getFirst() {
        E x = peekFirst();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    public E getLast() {
        E x = peekLast();
        if (x == null) throw new NoSuchElementException();
        return x;
    }
    public E peekFirst() {
        lock.lock();
        try {
            return (first == null) ? null : first.item;
        } finally {
            lock.unlock();
        }
    }
    public E peekLast() {
        lock.lock();
        try {
            return (last == null) ? null : last.item;
        } finally {
            lock.unlock();
        }
    }
    public boolean removeFirstOccurrence(Object o) {
        if (o == null) return false;
        lock.lock();
        try {
            for (Node<E> p = first; p != null; p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    public boolean removeLastOccurrence(Object o) {
        if (o == null) return false;
        lock.lock();
        try {
            for (Node<E> p = last; p != null; p = p.prev) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    //---------------------------------- BlockingQueue阻塞佇列 方法實現
    public boolean add(E e) {
 addLast(e);
 return true;
    }
    public boolean offer(E e) {
 return offerLast(e);
    }
    public void put(E e) throws InterruptedException {
 putLast(e);
    }
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
 return offerLast(e, timeout, unit);
    }
    public E remove() {
 return removeFirst();
    }
    public E poll() {
 return pollFirst();
    }
    public E take() throws InterruptedException {
 return takeFirst();
    }
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 return pollFirst(timeout, unit);
    }
    public E element() {
 return getFirst();
    }
    public E peek() {
 return peekFirst();
    }
    //------------------------------------------- Stack 方法實現
    public void push(E e) {
 addFirst(e);
    }
    public E pop() {
 return removeFirst();
    }
    //------------------------------------------- Collection 方法實現
    public boolean remove(Object o) {
 return removeFirstOccurrence(o);
    }
    public int size() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
    public boolean contains(Object o) {
        if (o == null) return false;
        lock.lock();
        try {
            for (Node<E> p = first; p != null; p = p.next)
                if (o.equals(p.item))
                    return true;
            return false;
        } finally {
            lock.unlock();
        }
    }
    boolean removeNode(Node<E> e) {
        lock.lock();
        try {
            for (Node<E> p = first; p != null; p = p.next) {
                if (p == e) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
  ……
}


3、LinkedBlockingDeque的優缺點

有了上面的結論再來研究LinkedBlockingDeque的優缺點。

優點當然是功能足夠強大,同時由於採用一個獨佔鎖,因此實現起來也比較簡單。所有對佇列的操作都加鎖就可以完成。同時獨佔鎖也能夠很好的支援雙向阻塞的特性。

凡事有利必有弊。缺點就是由於獨佔鎖,所以不能同時進行兩個操作,這樣效能上就大打折扣。從效能的角度講LinkedBlockingDeque要比LinkedBlockingQueue要低很多,比CocurrentLinkedQueue就低更多了,這在高併發情況下就比較明顯了。

前面分析足夠多的Queue實現後,LinkedBlockingDeque的原理和實現就不值得一提了,無非是在獨佔鎖下對一個連結串列的普通操作。

4、LinkedBlockingDeque的序列化、反序列化

有趣的是此類支援序列化,但是Node並不支援序列化,因此fist/last就不能序列化,那麼如何完成序列化/反序列化過程呢?

清單4 LinkedBlockingDeque的序列化、反序列化

private void writeObject(java.io.ObjectOutputStream s)
    throws java.io.IOException {
    lock.lock();
    try {
        // Write out capacity and any hidden stuff
        s.defaultWriteObject();
        // Write out all elements in the proper order.
        for (Node<E> p = first; p != null; p = p.next)
            s.writeObject(p.item);
        // Use trailing null as sentinel
        s.writeObject(null);
    } finally {
        lock.unlock();
    }
}

private void readObject(java.io.ObjectInputStream s)
    throws java.io.IOException, ClassNotFoundException {
    s.defaultReadObject();
    count = 0;
    first = null;
    last = null;
    // Read in all elements and place in queue
    for (;;) {
        E item = (E)s.readObject();
        if (item == null)
            break;
        add(item);
    }
}

 


清單4 描述的是LinkedBlockingDeque序列化/反序列化的過程。序列化時將真正的元素寫入輸出流,最後還寫入了一個null。讀取的時候將所有物件列表讀出來,如果讀取到一個null就表示結束。這就是為什麼寫入的時候寫入一個null的原因,因為沒有將count寫入流,所以就靠null來表示結束,省一個整數空間。