介紹

當你看本文時,需要具備以下知識點

二叉樹、完全二叉樹、二叉堆二叉樹的表示方法

如果上述內容不懂也沒關係可以先看概念。

PriorityBlockingQueue是一個無界的基於陣列的優先順序阻塞佇列,陣列的預設長度是11,雖然指定了陣列的長度,但是可以無限的擴充,直到資源消耗盡為止,每次出隊都返回優先級別最高的或者最低的元素。其實內部是由平衡二叉樹堆來進行排序的,先進行構造二叉樹堆,二叉樹堆排序出來的數每次第一個元素和最後一個元素進行交換,這樣最大的或最小的數就到了最後面,然後最後一個不變,重新構造前面的陣列元素以此類推進行堆排序。預設比較器是null,也就是使用佇列中元素的compareTo方法進行比較,意味著佇列元素要實現Comparable介面。

  • PriorityBlockingQueue是一個無界佇列,佇列滿時沒有進行阻塞限制,也就是沒有notFull進行阻塞,所以put是非阻塞的。
  • lock鎖獨佔鎖控制只有一個執行緒進行入隊、出隊操作。

平衡二叉樹堆

二叉堆的本質其實是一個完全二叉樹,它分為兩種型別:

  • 最大堆:最大堆的任何一個父節點的值都大於或等於它的左、右孩子節點的值。
  • 最小堆:最小堆的任何一個父節點的值都小於或等於它的左、右孩子節點的值。

二叉堆的根節點叫做堆頂

最大堆最小堆的特點:最大堆的堆頂是整個堆中最大元素,最小堆的堆頂是整個堆中最小元素。

我們知道二叉堆內部實現其實是基於陣列來實現的,為什麼二叉堆又能使用陣列來實現呢?因為二叉堆是完全二叉樹,並不會浪費空間資源,對於稀疏二叉樹如果使用陣列來實現會有很多左右結點為空的情況,陣列中需要進行佔位處理,佔位處理就會浪費很多空間,得不償失,但是二叉堆是一個完全二叉樹,所以不會有資源的浪費。

陣列下標表示方法:

左節點:2*parent+1

右節點:2*parent+2

n座標節點父節點:n/2

PriorityBlockingQueue類圖結構

原始碼分析

通過類圖可以清晰的發現它其實也是繼承自BlockingQueue介面以及Queue介面,說明也是阻塞佇列。在建構函式中預設佇列的容量是11,由於上面我們已經提到了,優先佇列使用的是二叉堆來實現的,二叉堆實現是根據陣列來實現的,所以預設構造器中初始化容量為11,如下程式碼所示:

/**
* 預設陣列長度。
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11; /**
* 最大陣列允許的長度。
* The maximum size of array to allocate.
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

上面講述了二叉堆的原理,二叉堆原理肯定是要進行比較大小,預設比較器是null,也就是使用元素的compareTo方法進行比較來確定元素的優先順序,就意味著佇列元素必須實現Comparable介面,如下是建構函式:

/**
* 建立一個預設長度為11的佇列,預設比較器為null。
* Creates a {@code PriorityBlockingQueue} with the default
* initial capacity (11) that orders its elements according to
* their {@linkplain Comparable natural ordering}.
*/
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
} /**
* 建立一個指定陣列初始化長度的佇列,預設比較器為null。
* Creates a {@code PriorityBlockingQueue} with the specified
* initial capacity that orders its elements according to their
* {@linkplain Comparable natural ordering}.
*
* @param initialCapacity the initial capacity for this priority queue
* @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
} /**
* 建立一個指定陣列初始化長度的佇列,比較器可以自己指定。
* Creates a {@code PriorityBlockingQueue} with the specified initial
* capacity that orders its elements according to the specified
* comparator.
*
* @param initialCapacity the initial capacity for this priority queue
* @param comparator the comparator that will be used to order this
* priority queue. If {@code null}, the {@linkplain Comparable
* natural ordering} of the elements will be used.
* @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
} /**
* 傳入一個集合,如果集合是SortedSet或PriorityBlockingQueue的話不需要進行堆化,直接使用原有的排序即可,如果不是則需要呼叫
* heapify方法進行堆初始化操作。
* Creates a {@code PriorityBlockingQueue} containing the elements
* in the specified collection. If the specified collection is a
* {@link SortedSet} or a {@link PriorityQueue}, this
* priority queue will be ordered according to the same ordering.
* Otherwise, this priority queue will be ordered according to the
* {@linkplain Comparable natural ordering} of its elements.
*
* @param c the collection whose elements are to be placed
* into this priority queue
* @throws ClassCastException if elements of the specified collection
* cannot be compared to one another according to the priority
* queue's ordering
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true表示需要進行堆化也就是初始化基於現有集合初始化一個二叉堆,使用下沉方式。
boolean screen = true; // true表示需要篩選空值
if (c instanceof Sort3edSet<?>) {
// 如果是SortedSet不需要進行堆初始化操作。
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
// 不需要進行堆初始化。
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
// 如果是PriorityBlockingQueue不需要進行堆初始化操作。
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
// 不需要篩選空值。
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
// 不需要進行堆初始化操作。
heapify = false;
}
// 將集合轉換成陣列型別。
Object[] a = c.toArray();
// 獲取陣列的長度大小
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
// 將轉化陣列的物件賦值給佇列,以及實際儲存的長度。
this.queue = a;
this.size = n;
if (heapify)
// 調整堆大小。
heapify();
}

建構函式中第四個建構函式傳遞的是一個集合,集合如果是SortSet和PriorityBlockingQueue是不需要進行堆初始化操作,如果是其他集合型別則需要進行堆排序。

private void heapify() {
Object[] array = queue;
int n = size;
// 這裡其實就是尋找完全二叉樹中最後一個非葉子節點值,由於陣列元素是從0開始的下標,長度是從1開始的所以需要減掉1相等於是陣列元素下標最後一個元素下標/2得到的值,也就是最後一個元素的父節點。
int half = (n >>> 1) - 1;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
// 迴圈遍歷非葉子節點的值。
for (int i = half; i >= 0; i--)
siftDownComparable(i, (E) array[i], array, n);
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}

例如我們初始化陣列為a=[7,1,3,10,5,2,8,9,6],完全二叉樹表示為:

首先half=9/2-1=4-1=3,a[3]=10,剛好是完全二叉樹中最後一個非葉子節點,再往上非葉子節點是3,1,7剛好陣列下標是遞減的,然後針對元素10進行下沉操作,發現左右子樹中最小元素是6,則6和10元素進行交換。

然後再下沉元素3,i--操作,得到如下完全二叉樹

繼續遍歷下一個非葉子節點,i=2減少1得到i=1,此時a[1]=1左右子節點6和5都比1大所以不需要進行變動,然後i進行減少,則到了i=0,此時a[7]=0,發現左右節點中元素1是最小的,所以先下沉到元素1的位置。



下沉後發現還可以繼續下沉,因為左右節點中右節點元素5是最小的所以還需要進行下沉操作。

至此下沉結束,二叉堆構建完成。

offer操作

接下來看一下佇列是如何進行構建成一個二叉堆的,其實這裡面構建二叉堆以及二叉堆獲取資料時,會採用上浮下沉的操作來進行處理整個二叉堆的平衡,詳細來看一下offer方法,程式碼如下所示:

/**
* Inserts the specified element into this priority queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Queue#offer})
* @throws ClassCastException if the specified element cannot be compared
* with elements currently in the priority queue according to the
* priority queue's ordering
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
// 首先獲取鎖物件。
final ReentrantLock lock = this.lock;
// 只有一個執行緒操作入隊和出隊動作。
lock.lock();
// n代表陣列的實際儲存內容的大小
// cap代表隊列的整體大小,也就是陣列的長度。
int n, cap;
Object[] array;
// 如果陣列實際長度大於等於陣列的長度時,需要進行擴容操作。
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
// 如果使用者指定比較器,則使用使用者指定的比較器來進行比較,如果沒有則使用預設比較器。
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 進行上浮操作。
siftUpComparable(n, e, array);
else
// 進行上浮操作。
siftUpUsingComparator(n, e, array, cmp);
// 實際長度增加1,由於有且僅有一個執行緒操作佇列,所以這裡並沒有使用原子性操作。
size = n + 1;
// 通知等待的執行緒,佇列已經有資料,可以獲取資料。
notEmpty.signal();
} finally {
// 解鎖操作。
lock.unlock();
}
// 返回操作成功。
return true;
}
  1. 首先獲取鎖物件,控制有且僅有一個執行緒操作佇列。
  2. 如果陣列實際儲存的內容大小大於等於陣列長度時進行擴容操作,呼叫tryGrow方法進行擴容。
  3. 使用比較器進行比較,進行上浮操作來建立二叉堆。

tryGrow是如何進行擴容的呢?

/**
* Tries to grow array to accommodate at least one more element
* (but normally expand by about 50%), giving up (allowing retry)
* on contention (which we expect to be rare). Call only while
* holding lock.
*
* @param array the heap array
* @param oldCap the length of the array
*/
private void tryGrow(Object[] array, int oldCap) {
// 這裡先釋放了鎖,為什麼要釋放鎖呢?詳細請見下面。
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 這裡allocationSpinLock預設是0,通過CAS來講該值修改為1,也就是同時只有一個執行緒進行擴容操作。
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 如果原容量小於64,就執行原容量+2,如果原容量大於64,擴大一倍容量。
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 新容量超過了最大容量,將容量調整為最大值。
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 建立新的陣列物件。
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 擴容成功,則將值修改為0。
allocationSpinLock = 0;
}
}
// 這裡是為了其他執行緒進入後優先讓擴容的執行緒進行操作。
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 加鎖操作。
lock.lock();
// 將原資料拷貝到新陣列中。
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}

看似擴容還是很容易理解的,開始的時候為什麼要先釋放鎖呢,然後用CAS控制只有一個執行緒可以擴容呢?其實可以不釋放鎖,也能控制只允許一個執行緒進行擴容操作,但是會產生效能問題,因為當我擴容的時候,一直獲得鎖會不允許其他執行緒進行入隊和出隊操作,擴容的時候先釋放鎖,也就是可以其他執行緒進行處理出隊和入隊操作,例如第一個執行緒先CAS成功,並且進行擴容中,此時第二個執行緒進入後allocationSpinLock此時已經等於1了,也就是不會進行擴容操作,而是會直接進入到判斷newArray==null,此時執行緒二發現newArray為null,執行緒而會呼叫Thread.yield來讓出CPU,目的是讓擴容的執行緒擴容後優先呼叫lock.lock重新獲得鎖,但是這又不能完全保證,有可能yield退出了擴容還沒有結束,此時執行緒二獲得鎖,如果當前陣列擴容沒有完畢,則執行緒二會再次呼叫offer的tryGrow進行擴容操作,再次給擴容執行緒讓出了鎖,再次呼叫yield讓出CPU。當擴容執行緒進行擴容時,其他執行緒自旋的檢測當前執行緒是否成功,成功了才會進行入隊的操作。

擴容成功後就需要構建二叉堆,構建二叉堆呼叫的是siftUpComparable方法,也就是上浮操作,接下來詳細講解一下上浮操作是如何進行的?

private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 找到節點k的父節點。
int parent = (k - 1) >>> 1;
// 獲取父節點的值。
Object e = array[parent];
// 比較父節點和插入值的大小,如果插入值大於父節點直接插入到末尾。
if (key.compareTo((T) e) >= 0)
break;
// 將父節點設定到子節點,小資料進行上浮操作。
array[k] = e;
// 將父節點的下標設定給k。
k = parent;
}
// 最後key上浮到k的位置。
array[k] = key;
} private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}

為了能夠演示演算法的整個過程,這裡舉例來說明一下:

為了演示擴容的過程我們先初始化佇列長度為2,後面會進行擴容操作。

  1. 當我們呼叫offer(3)時,此時k=n=size=0,x是我們要插入的內容,array是二叉堆陣列,當我們插入第一個元素是,發現k>0是不成立的,所以直接執行 array[k] = key,此時二叉堆陣列只有一個元素3,如下圖所示:

  2. 第二次呼叫offer(5),此時k=n=size=1實際儲存長度為1,執行(k-1)>>>1尋找父節點,k-1=0,無符號向右側移動一位,就相當於是$(k-1)/2$,和開始介紹二叉堆特點的時候尋找父節點是一樣的,此時要插入的節點5的父節點是3,需要進行比較3和5的大小,發現父節點小於要插入的節點,所以執行break退出迴圈,執行array[k] = key操作,將節點5插入到父節點3下面,並且size進行增加1,此時size=2,如下所示:

  3. 當第三次呼叫offer(9)時,此時k=n=size=2,發現(n = size) >= (cap = (array = queue).length)實際長度與陣列的長度相等,此時進行擴容操作,通過上述原始碼分析得到,oldCap+(oldCap+2)=2+(2+2)=6,陣列的長度從長度為2擴容到長度為6,將原有陣列內容賦值到新的陣列中,擴容之後進入到上浮操作進行入隊操作,其實怎麼理解呢?可以理解為我們將元素9插入到陣列最後一個位置,也就是佇列的最後一個位置。

    然後9這個元素需要找到它的父節點,那就是3,也即是(k-1)>>>1=(2-1)>>>1得到下標為0,array[0]=3,元素9的父節點是3,比較父節點和子節點大小,發現3<9位置不需要進行變動,則二叉堆就變成如下內容,size進行加1,size=3。

  4. 第四次呼叫offer(2)時,此時k=n=size=3,還是按照上面意思將元素2暫時插入到陣列的末尾,然後進行上浮操作,虛線的意思就是告訴你我現在還不一定在不在這裡,我要和我的父親比較下到底誰大,如果父節點大那我只能乖乖在這裡嘍,如果父節點小,不好意思我得往上浮動了,接下來看一上浮動的過程。

    k=3,元素2的父節點=(k-1)>>>1=2>>>1等於1,也就是array[1]的元素5是元素2的父節點,通過上面的二叉圖也能夠清晰看到元素5是父節點,比較發現2<5,執行array[k] = e,array[3]=5,也就是說將5的位置進行下沉操作,這裡原始碼並沒發現2上浮操作,但是在下一次比較中又用到了我們的元素2進行比較,其實現在樹的結構相當於如下所示:



    發現剛開始元素2在最後節點,現在被替換成了元素5,這就意味著元素2的位置變到了之前元素5的位置,原始碼中的k = parent,此時元素2的下標已經變到k=1,但是數組裡面的內容並沒有變,只是元素下標上浮操作,上浮到父節點,相當於元素2就在元素5的位置,只是下標1的位置元素內容並沒有直接替換成元素2僅此而已。接下來還會進行迴圈,陣列下標1的元素的父節點是元素3,這裡就不計算了,因為上面第二步計算過,此時發現2<3,需要將元素3的內容進行下沉操作,元素2的下標進行上浮操作。

    此時下標2的元素移動到父節點下標0的位置,發現k<0,則本次迴圈結束,將array[0]替換成元素2,本次上浮操作結束。

    由此可見,次堆為最小二叉堆,當出隊操作時彈出的是最小的元素。

poll操作

poll操作就是將佇列內部二叉堆的堆頂元素出隊操作,如果佇列為空則返回null。如下是poll的原始碼:

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
  1. 首先獲取鎖物件,進行上鎖操作,有且僅有一個執行緒出隊和入隊操作
  2. 獲得到鎖物件之後進行出隊操作,呼叫dequeue方法
  3. 最後釋放鎖物件

接下來看一下出隊操作是如何進行的,

/**
* Mechanics for poll(). Call only while holding lock.
*/
private E dequeue() {
// 陣列的元素的個數。
int n = size - 1;
// 如果陣列中不存在元素則直接返回null。
if (n < 0)
return null;
else {
// 獲取佇列陣列。
Object[] array = queue;
// 將第一個元素也就是二叉堆的根結點堆頂元素作為返回結果。
E result = (E) array[0];
// 獲取陣列中最後一個元素。
E x = (E) array[n];
// 將最後一個元素設定為null。
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 進行下沉操作。
siftDownComparable(0, x, array, n);
else
// 進行下沉操作。
siftDownUsingComparator(0, x, array, n, cmp);
// 實際元素大小減少1.
size = n;
// 返回結果。
return result;
}
}

通過原始碼可以看到,出隊的內容就是二叉堆的堆頂元素arrra[0],而後面它有取到的完全二叉樹的最後一個節點,將最後一個節點設定為null,然後在呼叫了siftDownComparable對堆進行了調整動作,看一下這個方法的具體實現內容,然後再結合上面入隊的內容進行講解出隊是如何保證二叉堆平衡的。

private static <T> void siftDownComparable(int k, T x, Object[] array,                                           int n) {    if (n > 0) {        Comparable<? super T> key = (Comparable<? super T>)x;      	// 最後一個節點的父節點,也就是代表到這裡之後就結束了。        int half = n >>> 1;           // loop while a non-leaf        while (k < half) {          	// child=2n+1代表leftChild左節點。            int child = (k << 1) + 1; // assume left child is least          	// 獲取左節點的值。            Object c = array[child];          	// 獲取右節點=2n+1+1=2n+2的座標            int right = child + 1;          	// 如果右側節點座標小於陣列中最有一個元素,並且右節點值小於左側節點值,則將c設定為右側節點值。            if (right < n &&                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)                c = array[child = right];          	// 對比c和當前最後一個元素的大小。            if (key.compareTo((T) c) <= 0)                break;          	// 將座標k位置設定為比較的後的值。            array[k] = c;          	// 將游標移動到替換的節點上。            k = child;        }      	// 最後將元素最後一個值賦值到k的位置。        array[k] = key;    }}private static <T> void siftDownUsingComparator(int k, T x, Object[] array,                                                int n,                                                Comparator<? super T> cmp) {    if (n > 0) {        int half = n >>> 1;        while (k < half) {            int child = (k << 1) + 1;            Object c = array[child];            int right = child + 1;            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)                c = array[child = right];            if (cmp.compare(x, (T) c) <= 0)                break;            array[k] = c;            k = child;        }        array[k] = x;    }}

看了原始碼其實還是有點蒙的,因為根本沒有辦法想象到是如何實現的,接下來跟著思路一步一步的分析,上面offer時最後二叉堆的情況如下圖所示:

  1. 先呼叫一次poll操作,此時size=4,result=array[0]=2,n=size-1=3,x=array[n]=array[3]=5,將array[3]位置設定為null,就變成如下的二叉堆

    陣列中最後一個元素相當於直接被設定為空了,從二叉堆中移除掉了,移除掉了放在那裡呢?其實這裡大家可以理解為放在堆頂,原因是什麼呢?我們想一下,堆頂元素相當於是要被出隊操作,元素2已經出隊了,但是沒有堆頂元素了,此時需要怎麼操作呢?直接從陣列中找到完全二叉樹葉子節點最後一個節點,將最後一個節點設定到堆頂,然後將堆頂元素進行下沉操作,但這裡原始碼中並沒有實際設定元素5到堆頂,而是經過比較的過程將元素5從堆頂進行下沉的操作,接下來一步一步分析,目前可以將堆看做如下:

    然後通過原始碼可以看到它是現將堆頂也就是元素2的左右節點相比較,比較3<9所以最小節點c=3,然後再跟元素5(可以看做現在是在堆頂)進行比較3<5,發現元素5大於元素3,將元素3的位置替換原堆頂的元素2(此時我們可以完全可以看成元素5的位置也再堆頂,其實就是替換元素5的位置內容),然後元素5的下標從0調整到原來元素3下標1的位置,這個動作我們稱之為下沉操作,下沉過程中並沒有進行內容交換,只是座標進行下沉操作了,此時二叉堆內容如下所示:

    half=3>>>1=1,k的下標在這個時候已經變為1,進行k<half時發現等於false,本次迴圈結束,將下標1位置替換真實下沉的內容 array[k] = key=array[1]=5,此時二叉堆內容如下所示:

最後將原調整前的result返回。

put操作

put的操作內部其實就是呼叫了offer操作,由於是無界陣列可以進行擴充,所以不需要進行阻塞操作。

public void put(E e) {
offer(e); // never need to block
}

take操作

take操作當佇列為空時進行阻塞操作,當佇列不為空呼叫offer操作時會呼叫notEmpty.signal();通知等待的執行緒,佇列中已經有資料了,可以繼續獲取資料了,原始碼如下:

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 呼叫的事可以相應中斷。
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
// 如果佇列為空時等待。
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

總結

  1. PriorityBlockingQueue是一個阻塞佇列,繼承自BlockingQueue
  2. PriorityBlockingQueue的元素必須實現Comparable介面。
  3. PriorityBlockingQueue內部實現原理是基於二叉堆來實現的,二叉堆的實現是基於陣列來實現的。
  4. PriorityBlockingQueue是一個無界陣列,插入值時不需要進行阻塞操作,獲取值如果佇列為空時阻塞執行緒獲取值。