Java 基礎學習筆記 —— 集合中的Queue
引言
繼上篇文章講過了Java中的List之後,接下來我們會關注另外一種集合型別——Queue。
Queue,也就是佇列,一種先進先出的資料結構。在Java中,從是否可以從尾部獲取元素分成了普通佇列以及雙端佇列。從是否會阻塞區分則分成了阻塞佇列和非阻塞佇列。這篇文章會從兩個方面對佇列進行介紹,第一部分主要介紹佇列的特點,第二部分會針對Java中一些比較典型的實現進行具體分析。
有一個小細節需要注意的就是,目前佇列裡面一般是不允許新增null元素的。
佇列簡介
首先還是看一下佇列的類圖。
從類圖中我們能夠看到,一共有4個介面類,Queue
,BlockingQueue
,TransferQueue
Deque
。這四個類分別定了四種類型的佇列的基本行為。首先我們會針對比較常用的Queue
以及BlockingQueue
來展開敘述。
Queue
Queue
是所有佇列都必須實現的介面,在這個接口裡面,定義了佇列最基本的方法。
public interface Queue<E> extends Collection<E> {
boolean add(E e); //新增元素,元素不可為null,若新增失敗,則丟擲異常
boolean offer(E e); //新增元素,元素不可為null,新增失敗,返回false,不丟擲異常
E remove(); //移除佇列首部元素,若佇列為空,則丟擲異常
E poll(); //移除佇列首部,若佇列為空,則返回null
E element();//獲取佇列首部元素,若佇列為空,則丟擲異常
E peek();//獲取佇列首部元素,若佇列為空,則返回null
}
簡單來看,佇列裡的介面主要定義了三個方法,新增元素,獲取元素,刪除元素,而又針對每個方法都定義了兩種版本的(一種操作失敗則返回異常,另外一種操作失敗則返回null或者是false)。
BlockingQueue
BlockingQueue
則定義了一些會發生等待的場景。如在佇列為空的時候進行元素獲取或者是在佇列已經滿了的時候進行元素新增。
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e); //將元素新增到佇列尾端,若新增失敗,直接丟擲異常
boolean offer(E e);//將元素新增到佇列尾端,若新增失敗,直接返回false
void put(E e) throws InterruptedException; //將元素新增到佇列尾端,若佇列已滿則將執行緒掛起等待
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException; //將元素新增到佇列尾端,若佇列已滿,則等待一段時間,若超時後仍無空間,直接返回false
E take() throws InterruptedException;//獲取佇列首個元素,若佇列為空,則將執行緒掛起進行等待
E poll(long timeout, TimeUnit unit)
throws InterruptedException; //獲取佇列首個元素,若佇列為空,則等待一段時間,若超時後仍無空間,返回null
int remainingCapacity();//獲取佇列的剩餘空間
boolean remove(Object o);//通過equal判斷元素是否存在佇列中,若存在,則將其移除
public boolean contains(Object o);//通過equal判斷元素是否存在佇列中,若存在,返回true
int drainTo(Collection<? super E> c);//將佇列中所有元素轉移到一個新的集合中,返回值為轉移成功的元素數量
int drainTo(Collection<? super E> c, int maxElements);//將佇列中所有元素轉移到一個新的集合中,最多轉移maxElements個,返回值為轉移成功的元素數量
}
從方法定義上來看,由於BlockingQueue
要確保在高併發下的執行緒安全,因此它提供了更多的方法進行獲取和刪除佇列元素。
具體實現
接下來還是針對具體的實現來研究具體的佇列。
PriorityQueue
PriorityQueue
是一個優先佇列。優先佇列的意思就是,從佇列裡面獲取元素的時候,會根據其優先順序,從小到大進行獲取,而非元素的新增順序。這要求我們在使用優先佇列的時候,要麼使用實現了Comparable
的類進行元素新增,要麼在初始化佇列的時候指定Comparator
。
PriorityQueue
是基於堆排序演算法來實現其優先順序的。
首先我們來看下其內部成員變數
public class PriorityQueue<E> extends AbstractQueue<E>
implements java.io.Serializable {
private static final long serialVersionUID = -7720805057305804111L;//用於序列化
private static final int DEFAULT_INITIAL_CAPACITY = 11; //預設容量
transient Object[] queue; // 使用陣列對元素進行儲存
private int size = 0; //初始化的佇列大小,為0
private final Comparator<? super E> comparator; //自定義的比較器
transient int modCount = 0; //版本號
}
然後再來看下幾個關鍵的方法
offer
方法
public boolean offer(E e) {
if (e == null) //不能新增null元素
throw new NullPointerException();
modCount++;//版本號增加
int i = size;
if (i >= queue.length)
grow(i + 1);//擴容
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e); //堆調整
return true;
}
擴容演算法
private void grow(int minCapacity) {
int oldCapacity = queue.length;
//與佇列不同,這裡的擴容方式是,在小容量的情況下,每次擴容為原來的一倍,否則為原來的1.5倍
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
//同樣需要判斷是否已經超過了陣列的最大容納量
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
queue = Arrays.copyOf(queue, newCapacity);
}
堆調整演算法
//新增時,元素是直接被新增到佇列尾端的,也就是堆的底部(佇列首端為堆頂),因此會從底部向上調整,直至這個元素找到合適的位置安放
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1; //獲取其父節點
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)//若父節點比當前節點要小,則無需再進行調整
break;
queue[k] = e;//若父節點比當前節點大,則將當前節點的值調整到父節點處
k = parent;//父節點將繼續進行調整
}
queue[k] = key; //最後安置元素
}
poll
方法
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0]; //獲取首部元素
E x = (E) queue[s]; //獲取佇列尾部元素
queue[s] = null; //尾部位置置成空
if (s != 0)
siftDown(0, x); //堆調整,這次相當於將佇列尾部的元素放到了堆頂,然後進行向下調整
return result;
}
//相當於從第n個元素開始,從上往下進行調整,直到元素x找到合適的位置
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // 只需要將非葉子結點遍歷就夠了
while (k < half) {
int child = (k << 1) + 1; // 首先,獲得節點的左子節點
Object c = queue[child];
int right = child + 1; //同時也獲取節點的右子節點
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right]; //若右子節點比左子節點小,則使用右子節點進行比較。這裡使用較小值進行比較,是為了確保後續swap之後,在頂部的元素比子節點要小
if (key.compareTo((E) c) <= 0) //如果當前元素已經比子節點都要小了,直接退出迴圈
break;
queue[k] = c;//否則,進行交換
k = child;
}
queue[k] = key;//最後為確定位置的元素賦值
}
最後看一下其內部迭代器實現。
private final class Itr implements Iterator<E> {
private int cursor = 0; //遊標指標,用於表示遍歷到第幾個
private int lastRet = -1; //上次返回的序號
private ArrayDeque<E> forgetMeNot = null; //若在遍歷過程中發生了元素刪除操作,堆調整後,元素可能會被放置在佇列頭部,為了防止漏掉這部分的元素遍歷,這裡再單獨建立一個佇列用於儲存這些元素
private E lastRetElt = null; //上次返回的元素
private int expectedModCount = modCount;//在迭代過程中,若發生了版本號變更,則會導致ConcurrentModificationException的丟擲
//是否有下一個元素,兩個判斷組成,首先當前遊標小於佇列長度,其次受到順序影響的元素數量為0
public boolean hasNext() {
return cursor < size ||
(forgetMeNot != null && !forgetMeNot.isEmpty());
}
@SuppressWarnings("unchecked")
public E next() {
if (expectedModCount != modCount)
throw new ConcurrentModificationException();
if (cursor < size)//首先遍歷佇列中的元素
return (E) queue[lastRet = cursor++];
if (forgetMeNot != null) { //然後遍歷順序受影響的元素
lastRet = -1;//同時將這個標記為-1,代表元素是從受影響的佇列中獲取的
lastRetElt = forgetMeNot.poll();
if (lastRetElt != null)
return lastRetElt;
}
throw new NoSuchElementException();
}
//刪除元素
public void remove() {
if (expectedModCount != modCount)
throw new ConcurrentModificationException();
//如果lastRet不為-1,代表目前遍歷的元素是通過遊標獲取的,可以直接在相應的位置進行刪除。
if (lastRet != -1) {
E moved = PriorityQueue.this.removeAt(lastRet);
lastRet = -1;
if (moved == null)
cursor--;
else {
if (forgetMeNot == null)
forgetMeNot = new ArrayDeque<>();
forgetMeNot.add(moved);
}
//如果lastRet == -1 且lastRetElt 不為null,則說明當前元素是從受影響佇列中獲取的,此時只能通過removeEq進行移除
} else if (lastRetElt != null) {
PriorityQueue.this.removeEq(lastRetElt);
lastRetElt = null;
} else {
throw new IllegalStateException();
}
expectedModCount = modCount;
}
}
最後瞭解一下removeAt
函式
private E removeAt(int i) {
modCount++;
int s = --size;
if (s == i) // 如果i和s相等,直接將佇列最後一個元素置null即可。
queue[i] = null;
else {
//否則,就需要取出佇列最後一個元素,並且將其放置在第i位上
E moved = (E) queue[s];
queue[s] = null;
//首先,向下調整
siftDown(i, moved);
//若向下調整時元素仍保持不變,說明元素已經是此子樹上最小值,需要考慮進行向上調整
if (queue[i] == moved) {
siftUp(i, moved);
//如果向上調整,元素位置發生了變化,則將此元素返回(迭代器進行遍歷的時候,無法通過遊標遍歷到了)
if (queue[i] != moved)
return moved;
}
}
return null;
}
列舉一個會出現這種情況的例子
如上圖所示,如果不存在forgetMeNot
儲存被調整到前面的元素,那麼在遍歷的時候,就不能確保遍歷佇列的所有元素了。
PriorityBlockingQueue
PriorityBlockingQueue
是一種阻塞佇列,它與PriorityQueue
一樣是一個基於陣列的二叉堆。對於所有的public操作如增刪查,它使用一個鎖進行控制,而在擴容的時候,會使用一個自旋鎖進行控制(擴容時除了自旋鎖之外沒有別的鎖,這樣可以確保擴容時不影響take
操作)。又因為優先佇列容量是沒有限制的(最大為Integer.MAX_VALUE,超出會OOM),所以在put
操作中不需要進行等待。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = 5595510919245408276L; //用於序列化的UID
private static final int DEFAULT_INITIAL_CAPACITY = 11;//預設的陣列容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大陣列容納量,仍然是由於部分VM會將部分空間用作保留字
private transient Object[] queue; //使用陣列來進行存放元素,這個陣列會表示一個二叉堆
private transient int size; //佇列大小
private transient Comparator<? super E> comparator;//比較器
private final ReentrantLock lock; //鎖
private final Condition notEmpty; //等待訊號,若佇列為空,則將take操作的執行緒掛起等待
private transient volatile int allocationSpinLock; //用於擴容時的自旋鎖
private PriorityQueue<E> q; //仍然保留了一個優先佇列的物件,用於序列化以及反序列化
}
實際上通過其變數就大概能夠知道,這裡的操作,實際上只有在take
以及帶超時時間的offer
操作中,是需要加鎖進行的。具體細節就不在這裡展開了。主要還是瞭解一下它的迭代器。
final class Itr implements Iterator<E> {
final Object[] array; // 儲存了所有元素的陣列
int cursor; // 遊標
int lastRet; // 上次返回元素,實際上用於標記是否進行過刪除操作,避免重複刪除,以及在未開始遍歷時的刪除
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E)array[cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
從這個迭代器的實現中我們不難發現,這個迭代器並沒有涉及到任何加鎖的地方。實際上在初始化這個迭代器的時候,首先會對當前佇列做一個SNAPSHOT,將所有元素複製一遍,並且將其賦值到迭代器中。這樣,迭代器就不需要考慮併發場景下的遍歷了,因為對它而言,它已經不需要保證執行緒安全了。
LinkedBlockingQueue
與前面提到的兩個佇列不同,LinkedBlockingQueue
是通過連結串列來實現佇列的。
首先還是看一下其內部結構
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private final int capacity; //可以為其設定一個最大容量,若不設定的話,預設就是Integer.MAX_VALUE
private final AtomicInteger count = new AtomicInteger();//記錄當前佇列中的元素數量
transient Node<E> head; //頭指標
private transient Node<E> last;//尾指標
//讀鎖,寫鎖分開。以確保讀寫之間不會相互影響
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();//使用take操作時,若當前佇列為空,則進行等待
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();//使用put操作時,若當前佇列為空,則進行等待。
}
可以看到,由於在這裡涉及到了頭指標和尾指標,因此使用了兩個鎖分別對其進行控制,主要的原因還是為了避免在操作其中一個指標的時候,阻塞其它操作。如put操作以及poll操作,只是在操作過程中使用鎖進行了條件控制。
這裡主要了解一下元素的入對操作enqueue
以及出對操作dequeue
enqueue
private void enqueue(Node<E> node) {
last = last.next = node;//入隊操作較為簡單,只需要將node放到佇列末端即可,此時這種操作的執行緒安全性由上層方法保證
}
dequeue
方法
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // 為了讓其能夠被回收,將已經廢棄的head引用指向自身
head = first;
E x = first.item;
first.item = null; //為了表明這個元素已經被移除了,將其item置為null
return x;
}
重點還是放在其迭代器上
private class Itr implements Iterator<E> {
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
Itr() {
fullyLock(); //在建立迭代器的時候,需要同時把頭指標和尾指標都鎖住。因為有可能頭尾指標指向同一個元素
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}
public boolean hasNext() {
return current != null;
}
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
if (s == p)//若s==p,則代表這個元素是被移除了的佇列首部元素,因此在這個時候,直接返回最新的head.next即可
return head.next;
if (s == null || s.item != null) //若s為null,則意味著已經到佇列的尾端了。若s.item不為null,則說明當前元素未被移除,仍然可以進行返回
return s;
p = s;
}
}
public E next() {
fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
public void remove() {
if (lastRet == null)
throw new IllegalStateException();
fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
//這樣做比直接調remove(Object)的好處是,避免了佇列中有多個相等元素時的誤刪
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (p == node) {
unlink(p, trail);
break;
}
}
} finally {
fullyUnlock();
}
}
}
從其迭代器的實現上我們能夠發現,迭代器並不會嚴格保證被刪除的元素不會再被遍歷到(每一次next操作會預先讀取下一個元素的值),而且在迭代器中呼叫remove方法的時候,也是會遍歷全部陣列的。因此,最好的建議就是,避免既使用concurrent中的佇列,又使用其迭代器進行遍歷。因為迭代器遍歷的,只能保證是某一時刻的的快照,既然是多執行緒讀寫,就可能會出現一些我們預料之外的狀態。
ConcurrentLinkedQueue
ConcurrentLinkedQueue
是通過樂觀鎖形式來支援高併發佇列的實現類,關於這個類的具體剖析更多的可以看這篇文章。這篇文章從狀態機角度切入,分析了應該如何去實現一個高併發的類。
小結
這篇文章主要介紹了集合中的佇列。並具體介紹了一個非併發佇列——PriorityQueue
,三個支援併發的類,ConcurrentPriorityQueue
,LinkedBlockingQueue
以及使用樂觀鎖的ConcurrentLinkedQueue
。並且對部分關鍵操作進行了分析。這些分析主要關注具體實現背後的資料結構,以及相關操作具體的效能。