1. 程式人生 > >3.Java資料結構原理解析-Queue系列

3.Java資料結構原理解析-Queue系列

Queue,也就是佇列,滿足FIFO的特性。
在Java中,Queue是一個介面,它的實現類有很多,其中非執行緒安全的代表是LinkedList,執行緒安全的有阻塞和非阻塞的,阻塞的大都實現了Queue的子介面BlockingQueue(阻塞佇列),例如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue等。非阻塞的有ConcurrentLinkedQueue。

Queue介面方法定義:

//新增元素,成功返回true,容量不夠拋IllegalStateException
boolean add(E e)
//新增元素,成功返回true,容量不足返回false boolean offer(E e) //移除隊首元素,佇列為空時拋NoSuchElementException E remove() //移除隊首元素,佇列為空時返回null E poll() //檢視隊首元素,佇列為空時拋NoSuchElementException E element() //檢視隊首元素,佇列為空時返回null E peek()

BlockingQueue介面定義(BlockingQueue除了繼承Queue定義的方法外,還加入了自己的阻塞方法):

//新增元素,容量不足阻塞
void put(E e) throws InterruptedException
//新增元素,容量不足時等待指定時間
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException //移除隊首元素,佇列為空時阻塞 E take() throws InterruptedException //移除隊首元素,佇列為空時等待指定時間 E poll(long timeout, TimeUnit unit) throws InterruptedException

佇列大多數是在多執行緒環境下使用的,生產者執行緒往佇列中新增元素,消費者執行緒從佇列中取出元素。所以,下面重點討論ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue是採用什麼樣的資料結構和演算法來保證佇列的執行緒安全性的。

1.阻塞佇列 ArrayBlockingQueue

ArrayBlockingQueue底層的資料結構是陣列和迴圈佇列,使用一個可重入鎖和這個鎖的兩個條件物件進行併發控制。
首先,來看看ArrayBlockingQueue的屬性。

//存放元素的陣列
final Object[] items;
//迴圈佇列頭指標,起始值為0
int takeIndex;
//迴圈佇列尾指標,指向下一個元素插入的位置,起始值為0
int putIndex;
//元素的個數
int count;

//可重入鎖(被final修飾,之所以沒有初始化,是因為所有的構造方法裡面都對lock進行了初始化)
final ReentrantLock lock;
//佇列非空條件
private final Condition notEmpty;
//佇列未滿條件
private final Condition notFull;

ArrayBlockingQueue的長度是固定的,無法擴容,所以建立一個ArrayBlockingQueue物件時,必須指定佇列的容量,並且ArrayBlockingQueue不允許原始為null。從建構函式上可以看出這一點。

//建立一個指定容量的佇列,鎖預設是非公平的
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
//建立一個指定容量、指定鎖的公平性的佇列
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    //建立鎖
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

//用現有的集合建立一個指定容量、指定鎖的公平性的佇列
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    //建立佇列
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion(鎖定只用於可見性,而不是互斥)
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        //尾指標
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

(1)插入元素add、offer、put

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

private void insert(E x) {
    items[putIndex] = x;
    //佇列尾指標+1
    putIndex = inc(putIndex);
    ++count;
    //通知在notEmpty上等待的執行緒
    notEmpty.signal();
}

//迴圈加。迴圈佇列的實現就體現在這裡
final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //在鎖上等待,直到獲取鎖,但是會響應中斷,優先考慮響應中斷,而不是響應鎖的普通獲取或重入獲取。
    //不明白為什麼add和offer方法使用lock,而put方法使用lockInterruptibly?
    lock.lockInterruptibly();
    try {
        //佇列已滿,在notFull物件上等待
        while (count == items.length)
            notFull.await();
        insert(e);
    } finally {
        lock.unlock();
    }
}

(2)取出元素remove、poll、take

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

public E poll() {
   final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //佇列為空時返回null
        return (count == 0) ? null : extract();
    } finally {
        lock.unlock();
    }
}

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null;
    //佇列頭指標+1
    takeIndex = inc(takeIndex);
    --count;
    //通知在notFull物件上等待的執行緒
    notFull.signal();
    return x;
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //佇列為空時,在notEmpty上等待
        while (count == 0)
            notEmpty.await();
        return extract();
    } finally {
        lock.unlock();
    }
}

2.阻塞佇列 LinkedBlockingQueueue

LinkedBlockingQueueue底層的資料結構是單向連結串列,使用兩個可重入鎖(放鎖和拿鎖)和物件的條件物件來進行併發控制。
LinkedBlockingQueueue由於使用了兩個鎖,所以允許同時新增和取出元素。這一點是和ArrayBlockingQueue最大的區別。

一個類的屬性體現了這個類的資料結構,我們首先看看LinkedBlockingQueueue的屬性

//連結串列的節點。從節點可以看出該連結串列只有一個next指標,是單向的,
static class Node<E> {
    E item;

    Node<E> next;

    Node(E x) { item = x; }
}

//佇列的容量,定義為final,說明所有的構造方法必須初始化容量
private final int capacity;

//元素的個數,因為使用了放鎖和拿鎖兩個鎖,所以同時新增和取出元素時存在併發問題,使用原子操作來保證元素的個數的準確性
private final AtomicInteger count = new AtomicInteger(0);

//單向連結串列頭指標,head.item永遠為null。(定義為transient說明不能序列化)
private transient Node<E> head;

//單向連結串列尾指標,last.next永遠為null。(定義為transient說明不能序列化)
private transient Node<E> last;

//拿鎖(控制remove、poll、take方法等)
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();

//放鎖(控制add、offer、put方法等)
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

照常理來說,取出一個元素後,佇列應該是notFull,那麼拿鎖控制的是應該是notFull的條件變數,但是因為此處存在兩把鎖,可能在取出元素後,又有元素加入了。所有此處拿鎖控制的是notEmpty,取出元素後,只要判斷剩下的元素是否大於1就可以了,因為不可能有兩個執行緒同時執行取操作。

(1)插入元素add、offer、put

//add方法是在AbstractQueue實現了,所以跟ArrayBlockingQueue一樣
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node(e);
    //鎖定放鎖
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //佇列未滿
        if (count.get() < capacity) {
            enqueue(node);
            //佇列長度+1
            c = count.getAndIncrement();
            //插入之後,佇列還是未滿,通知在notFull物件上的等待的執行緒(例如:put方法)
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    //c==0表示插入之前佇列為空,佇列為空說明可能有讀執行緒在阻塞,如果c>0,說明肯定沒有讀執行緒在阻塞
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

//signalNotEmpty雖然用在offder/put中,但是從不在putLock的同步區內。這樣就保證同一時刻只持有一個鎖,這樣就不會出現死鎖問題。
//???關於此處為什麼加鎖的問題,暫時就是這樣理解的
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

//入隊。入隊操作很簡單,就是將連結串列尾指標的next節點指向當前節點,並把當前節點設定為尾指標
private void enqueue(Node<E> node) {
    last = last.next = node;
}

(2)取出操作remove、poll、take

//remove()方法是在AbstractQueue中實現了,跟ArrayBlockingQueue一樣
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            //出隊
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

//出隊操作比較簡單,就是將單鏈表的頭指標指向下一個元素
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    //不是很明白這個,如果要幫助GC,直接將h.next=null不是更好嗎?
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            //在notEmpty條件上等待
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}    

3.阻塞佇列 SynchronousQueue

  SynchronousQueue跟上面兩個阻塞佇列不同,它內部沒有容器,一個生產執行緒put的時候,如果當前沒有消費執行緒執行take,此生產執行緒必須阻塞,等待一個消費執行緒呼叫take操作,take操作將會喚醒該生產執行緒,同時消費執行緒會獲取生產執行緒的資料(即資料傳遞),這樣的一個過程稱為一次配對過程(當然也可以先take後put,原理是一樣的)。

4.非阻塞佇列 ConcurrentLinkedQueue

ArrayBlockingQueue和LinkedBlockingQueue都是阻塞的,阻塞體現在入隊和出隊的時候需要加鎖。
下面介紹的ConcurrentLinkedQueue是非阻塞的,ConcurrentLinkedQueue底層的資料結構和LinkedBlockingQueue相同,也是使用單鏈表,不同的是ConcurrentLinkedQueue通過sun.misc.Unsafe類的CAS操作來保證執行緒安全的。

Unsafe類提供了硬體級別的原子操作,主要compareAndSwapXXX方法實現。
關於Unsafe,網上有很多資源,請自行查閱。

我們首先來看看ConcurrentLinkedQueue的成員變數。

//單鏈表頭節點
private transient volatile Node<E> head;

//單鏈表尾節點
private transient volatile Node<E> tail;

//節點型別。與LinkedBlockingQueue不同的是,所有的賦值操作都是通過Unsafe物件的CAS來完成的,所以是執行緒安全的
private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    //為item賦值
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    //為next指標賦值
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            //獲取item屬性和next屬性的記憶體地址
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

(1)入隊add、offer
這裡寫圖片描述
需要注意的是,每次入隊之後,tail並不是總指向最後一個節點。奇數時是倒數第二個節點,偶數時是第一個節點。

public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        //獲得p的下一個節點
        Node<E> q = p.next;
        //如果下一個節點是null,也就是p節點就是尾節點
        if (q == null) {
            //將單鏈表的尾節點的next指標指向新節點
            if (p.casNext(null, newNode)) {
                if (p != t)
                     //如果tail不是尾節點則將入隊節點設定為tail。
                     // 如果失敗了,那麼說明有其他執行緒已經把tail移動過
                    casTail(t, newNode);
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        // 如果p節點等於p的next節點,則說明p節點和q節點都為空,表示佇列剛初始化,所以返回
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

//為佇列的尾節點賦值
private boolean casTail(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}