1. 程式人生 > >JUC原始碼分析-集合篇(七):PriorityBlockingQueue

JUC原始碼分析-集合篇(七):PriorityBlockingQueue

PriorityBlockingQueue 是二叉堆結構的無界優先順序阻塞佇列,使用顯示鎖 Lock 保證執行緒安全,是一個執行緒安全的 PriorityQueue。元素的優先順序順序通過 Comparator 實現,內部不可新增不可比較的值。相較於我們前幾章所講的Queue,PriorityBlockingQueue 算是一個老牌的隊列了,從JDK1.5加入JUC行列,如果我們需要對佇列進行優先順序排序,PriorityBlockingQueue 將是一個不錯的選擇。

資料結構及核心引數

//預設容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//陣列最大長度 
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 
//內部儲存元素的陣列,基於二叉堆實現
private transient Object[] queue;

//Lock used for all public operations
private final ReentrantLock lock;

//Condition for blocking when empty
private final Condition notEmpty;

/**初始0為可獲取狀態,用於控制擴容操作*/
private transient volatile int allocationSpinLock;
//比較器
private transient Comparator<? super E> comparator;
//內部PriorityQueue引用,用於相容序列化
private PriorityQueue<E> q;

二叉堆

PriorityBlockingQueue(後稱PBQ)的資料由內部的一個 Object 陣列(queue)儲存,這個陣列本質上是一個二叉堆

二叉堆

二叉堆是一種特殊的堆,二叉堆是完全二叉樹或者是近似完全二叉樹。二叉堆滿足堆特性:父節點的鍵值總是與任何一個子節點的鍵值保持固定的序關係,且每個節點的子節點也都是一個二叉堆。

當父節點的鍵值總是大於或等於任何一個子節點的鍵值時為最大堆。 當父節點的鍵值總是小於或等於任何一個子節點的鍵值時為最小堆

二叉堆一般用陣列來表示。如果根節點在陣列中的位置是1,第n個位置的子節點分別在 2*n 和 2*n+1。因此,第1個位置的子節點在2和3,第2個位置的子節點在4和5。以此類推。這種基於1的陣列儲存方式便於尋找父節點和子節點。如果儲存陣列的下標基於0,那麼下標為i的節點的子節點是 2*n + 1 與 2*(n+1);其父節點的下標是 (n − 1)/2,PriorityBlockingQueue 中使用的就是基於0下標的二叉堆。

原始碼解析

如果你理解了我們前幾章的內容,會發現 PBQ 的入隊/出隊操作都很簡單,沒有太複雜的演算法,無非就是對二叉堆的插入/刪除操作。

offer(E e)

//入隊
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        //佇列擴容
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        //找到合適位置插入元素
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

說明:首先對佇列加鎖,判斷佇列是否需要擴容,如果需要呼叫tryGrow()方法進行擴容;然後呼叫siftUpComparable()方法找到合適位置插入元素;更新佇列元素數size,喚醒等待notEmpty的執行緒,最後別忘了解除鎖定unlock。

  1. PBQ 的擴容說明:當佇列元素數大於等於陣列的長度時,會觸發擴容操作,擴容是由單執行緒完成的。如果陣列長度 cap 小於64,擴容長度為 2*(cap+1);否則擴容長度為原來的1.5倍(1.5*cap),tryGrow()原始碼如下:
private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //計算新的陣列容量(1.5*cap或2*(cap+1))
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    //如果有其他執行緒已經在進行擴容操作,當前執行緒讓出執行時間片
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}
  1. siftUpComparable()siftUpUsingComparator(),由於佇列中的元素都是有優先順序(基於comparator排序)的,所以如果有新元素進來不會像其他佇列一樣直接放在隊尾,而是通過這兩個方法找到新增元素在佇列中的排序位置然後插入,原始碼如下:
//在k位置插入元素x,從父節點開始向上找到合適位置,保持二元堆的性質不變
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        //從父節點開始向上查詢,並保持二叉堆性質
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;//找到合適位置,跳出迴圈
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}
//自定義Comparator版本的siftUpComparable
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                   Comparator<? super T> cmp) {
    while (k > 0) {
        //從父節點開始向上查詢,並保持二叉堆性質
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;//找到合適位置,跳出迴圈
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}

poll()

//出列
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

/**
 * Mechanics for poll().  Call only while holding lock.
 */
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];//第一個元素,即出列元素
        E x = (E) array[n];//最後一個元素
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        //重構二叉堆
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

說明: 在出列操作時,首先移除陣列的最後一個元素,然後呼叫siftDownComparablesiftDownUsingComparator方法進行二叉堆的重組,最後返回佇列的第一個元素。 PBQ的出列本質上是刪除二叉堆的根節點,然後,把堆儲存的最後那個節點移到填在根節點處,再從上而下調整父節點與它的子節點。siftDownComparablesiftDownUsingComparator原始碼如下:

//在k位置插入元素x,從子節點開始向下調整節點位置,保持二叉堆的性質不變
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        //獲取最後一個節點的父節點
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            //從左葉子節點向下調整
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                int n,
                                                Comparator<? super T> cmp) {
    if (n > 0) {
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = array[child];
            int right = child + 1;
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            if (cmp.compare(x, (T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }
}

小結

本章重點:理解 PriorityBlockingQueue 的優先順序策略實現,二叉堆資料結構

作者:泰迪的bagwell 連結:https://www.jianshu.com/p/fd26c91cd2a0 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。