LinkedBlockingQueue原始碼解析
上一篇部落格,我們介紹了ArrayBlockQueue,知道了它是基於陣列實現的有界阻塞佇列,既然有基於陣列實現的,那麼一定有基於連結串列實現的隊列了,沒錯,當然有,這就是我們今天的主角:LinkedBlockingQueue。ArrayBlockQueue是有界的,那麼LinkedBlockingQueue是有界還是無界的呢?我覺得可以說是有界的,也可以說是無界的,為什麼這麼說呢?看下去你就知道了。
和上篇部落格一樣,我們還是先看下LinkedBlockingQueue的基本應用,然後解析LinkedBlockingQueue的核心程式碼。
LinkedBlockingQueue基本應用
public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue(); linkedBlockingQueue.add(15); linkedBlockingQueue.add(60); linkedBlockingQueue.offer(50); linkedBlockingQueue.put(100); System.out.println(linkedBlockingQueue); System.out.println(linkedBlockingQueue.size()); System.out.println(linkedBlockingQueue.take()); System.out.println(linkedBlockingQueue); System.out.println(linkedBlockingQueue.poll()); System.out.println(linkedBlockingQueue); System.out.println(linkedBlockingQueue.peek()); System.out.println(linkedBlockingQueue); System.out.println(linkedBlockingQueue.remove(50)); System.out.println(linkedBlockingQueue); } 複製程式碼
執行結果:
[15, 60, 50, 100] 4 15 [60, 50, 100] 60 [50, 100] 50 [50, 100] true [100] 複製程式碼
程式碼比較簡單,先試著分析下:
- 建立了一個LinkedBlockingQueue 。
- 分別使用add/offer/put方法向LinkedBlockingQueue中新增元素,其中add方法執行了兩次。
- 打印出LinkedBlockingQueue:[15, 60, 50, 100]。
- 打印出LinkedBlockingQueue的size:4。
- 使用take方法彈出第一個元素,並打印出來:15。
- 打印出LinkedBlockingQueue:[60, 50, 100]。
- 使用poll方法彈出第一個元素,並打印出來:60。
- 打印出LinkedBlockingQueue:[50, 100]。
- 使用peek方法彈出第一個元素,並打印出來:50。
- 打印出LinkedBlockingQueue:[50, 100]。
- 使用remove方法,移除值為50的元素,返回true。
- 打印出LinkedBlockingQueue:100。
程式碼比較簡單,但是還是有些細節不明白:
- 底層是如何保證執行緒安全性的?
- 資料儲存在哪裡,以什麼形式儲存的?
- offer/add/put都是往佇列裡面新增元素,區別是什麼?
- poll/take/peek都是彈出佇列的元素,區別是什麼?
要解決上面的疑問,最好的途徑還是看原始碼,下面我們就來看看LinkedBlockingQueue的核心原始碼。
LinkedBlockingQueue原始碼解析
構造方法
LinkedBlockingQueue提供了三個構造方法,如下圖所示:

我們一個一個來分析。
LinkedBlockingQueue()
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } 複製程式碼
無參的構造方法竟然直接把“鍋”甩出去了,甩給了另外一個構造方法,但是我們要注意傳的引數:Integer.MAX_VALUE。
LinkedBlockingQueue(int capacity)
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } 複製程式碼
- 判斷傳入的capacity是否合法,如果不大於0,直接丟擲異常。
- 把傳入的capacity賦值給capacity。
- 新建一個Node節點,並且把此節點賦值給head和last欄位。
這個capacity是什麼呢?如果大家對程式碼有一定的感覺的話,應該很容易猜到這是LinkedBlockingQueue的最大容量。如果我們呼叫無參的構造方法來建立LinkedBlockingQueue的話,那麼它的最大容量就是Integer.MAX_VALUE,我們把它稱為“無界”,但是我們也可以指定最大容量,那麼此佇列又是一個“有界”隊列了,所以有些部落格很草率的說LinkedBlockingQueue是有界佇列,或者是無界佇列,個人認為這是不嚴謹的。
我們再來看看這個Node是個什麼鬼:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } 複製程式碼
是不是有一種莫名的親切感,很明顯,這是單向連結串列的實現呀,next指向的就是下一個Node。
LinkedBlockingQueue(Collection<? extends E> c)
public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE);//呼叫第二個構造方法,傳入的capacity是Int的最大值,可以說 是一個無界佇列。 final ReentrantLock putLock = this.putLock; putLock.lock(); //開啟排他鎖 try { int n = 0;//用於記錄LinkedBlockingQueue的size //迴圈傳入的c集合 for (E e : c) { if (e == null)//如果e==null,則丟擲空指標異常 throw new NullPointerException(); if (n == capacity)//如果n==capacity,說明到了最大的容量,則丟擲“Queue full”異常 throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e));//入隊操作 ++n;//n自增 } count.set(n);//設定count } finally { putLock.unlock();//釋放排他鎖 } } 複製程式碼
- 呼叫第二個構造方法,傳入了int的最大值,所以可以說此時LinkedBlockingQueue是無界佇列。
- 開啟排他鎖putLock 。
- 定義了一個變數n,用來記錄當前LinkedBlockingQueue的size。
- 迴圈傳入的集合,如果其中的元素為null,則丟擲空指標異常,如果n==capacity,說明到了最大的容量,則丟擲“Queue full”異常,否則執行enqueue操作來進行入隊,然後n進行自增。
- 設定count為n,由此可知,count就是LinkedBlockingQueue的size了。
- 在finally中釋放排他鎖putLock 。
offer
public boolean offer(E e) { if (e == null) throw new NullPointerException();//如果傳入的元素為NULL,丟擲異常 final AtomicInteger count = this.count;//取出count if (count.get() == capacity)//如果count==capacity,說明到了最大容量,直接返回false return false; int c = -1;//表示size Node<E> node = new Node<E>(e);//新建Node節點 final ReentrantLock putLock = this.putLock; putLock.lock();//開啟排他鎖 try { if (count.get() < capacity) {//如果count<capacity,說明還沒有達到最大容量 enqueue(node);//入隊操作 c = count.getAndIncrement();//獲得count,賦值給c後完成自增操作 if (c + 1 < capacity)//如果c+1 <capacity,說明還有剩餘的空間,喚醒因為呼叫notFull的await方法而被阻塞的執行緒 notFull.signal(); } } finally { putLock.unlock();//在finally中釋放排他鎖 } if (c == 0)//如果c==0,說明釋放putLock的時候,佇列中有一個元素,則呼叫signalNotEmpty signalNotEmpty(); return c >= 0; } 複製程式碼
- 如果傳進來的元素為null,則丟擲異常。
- 把本類例項的count賦值給區域性變數count。
- 如果count==capacity,說明到了最大的容量,直接返回false。
- 定義區域性變數c,用來表示size,初始值是-1。
- 新建Node節點。
- 開啟排他鎖putLock。
- 如果count>=capacity,說明到了最大的容量,釋放排他鎖後,返回false,因為此時c=-1,c>=0為false;如果count<capacity,說明還有剩餘空間,繼續往下執行。這裡需要思考一個問題,為什麼第三步已經判斷過了是否還有剩餘空間,這裡還要再判斷一次呢?因為可能有多個執行緒都在執行add/offer/put方法,當佇列沒有滿的時候,多個執行緒同時執行到第三步(第三步的時候還沒有開啟排他鎖),然後同時往下走,所以開啟排他鎖後,還需要重新判斷下。
- 執行入隊操作。
- 獲得count,並且賦值給c後,完成自增的操作。注意,是先賦值後自增,賦值和自增的先後順序會直接影響到後面的判斷邏輯。
- 如果c+1<capacity,說明還有剩餘的空間,喚醒因為呼叫notFull的await方法而被阻塞的執行緒。這裡為什麼要+1再進行判斷?因為在第9步中,是先賦值後自增,也就是說區域性變數c儲存的還是入隊之前LinkedBlockingQueue的size,所以要先進行+1操作,得到的才是當前LinkedBlockingQueue的size。
- 在finally中,釋放排他鎖putLock。
- 如果c==0,說明在釋放putLock排他鎖的時候,佇列中有且只有一個元素,則呼叫signalNotEmpty方法。讓我們來看看signalNotEmpty方法:
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } 複製程式碼
程式碼比較簡單,就是開啟排他鎖,喚醒因為呼叫notEmpty的await方法而被阻塞的執行緒,但是這裡需要注意,這裡獲得的排他鎖已經不再是putLock,而是takeLock。
add
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } 複製程式碼
add方法直接呼叫了offer方法,但是add和offer還不完全一樣,當佇列滿了,如果呼叫offer方法,會直接返回false,但是呼叫add方法,會丟擲"Queue full"的異常。
put
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException();//如果傳入的元素為NULL,丟擲異常 int c = -1;//表示size Node<E> node = new Node<E>(e);//新建Node節點 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count;//獲得count putLock.lockInterruptibly();//開啟排他鎖 try { //如果到了最大容量,呼叫notFull的await方法,等待喚醒,用while迴圈,是為了防止虛假喚醒 while (count.get() == capacity) { notFull.await(); } enqueue(node);//入隊 c = count.getAndIncrement();//count先賦值給c後,再進行自增操作 if (c + 1 < capacity)//如果c+1<capacity,呼叫notFull的signal方法,喚醒因為呼叫notFull的await方法而被阻塞的執行緒 notFull.signal(); } finally { putLock.unlock();//釋放排他鎖 } if (c == 0)//如果佇列中有一個元素,喚醒因為呼叫notEmpty的await方法而被阻塞的執行緒 signalNotEmpty(); } 複製程式碼
- 如果傳入的元素為NULL,則丟擲異常。
- 定義一個區域性變數c,來表示size,初始值是-1。
- 新建Node節點。
- 把本類例項中的count賦值給區域性變數count。
- 開啟排他鎖putLock。
- 如果到了最大容量,則呼叫notFull的await方法,阻塞當前執行緒,等待其他執行緒呼叫notFull的signal方法來喚醒自己,這裡用while迴圈是為了防止虛假喚醒。
- 執行入隊操作。
- count先賦值給c後,再進行自增操作。
- 如果c+1<capacity,說明還有剩餘的空間,則呼叫notFull的signal方法,喚醒因為呼叫notFull的await方法而被阻塞的執行緒。
- 釋放排他鎖putLock。
- 如果佇列中有且只有一個元素,喚醒因為呼叫notEmpty的await方法而被阻塞的執行緒。
enqueue
private void enqueue(Node<E> node) { last = last.next = node; } 複製程式碼
入隊操作是不是特別簡單,就是把傳入的Node節點,賦值給last節點的next欄位,再賦值給last欄位,從而形成一個單向連結串列。
小總結
至此offer/add/put的核心原始碼已經分析完畢,我們來做一個小總結,offer/add/put都是新增元素的方法,不過他們之間還是有所區別的,當佇列滿了,呼叫以上三個方法會出現不同的情況:
- offer:直接返回false。
- add:雖然內部也呼叫了offer方法,但是佇列滿了,會丟擲異常。
- put:執行緒會阻塞住,等待喚醒。
size
public int size() { return count.get(); } 複製程式碼
沒什麼好說的,count記錄著LinkedBlockingQueue的size,獲得後返回就是了。
take
public E take() throws InterruptedException { E x; int c = -1;//size final AtomicInteger count = this.count;//獲得count final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//開啟排他鎖 try { while (count.get() == 0) {//說明目前佇列中沒有資料 notEmpty.await();//阻塞,等待喚醒 } x = dequeue();//出隊 c = count.getAndDecrement();//先賦值,後自減 if (c > 1)//如果size>1,說明在出隊之前,佇列中有至少兩個元素 notEmpty.signal();//喚醒因為呼叫notEmpty的await方法而被阻塞的執行緒 } finally { takeLock.unlock();//釋放排他鎖 } if (c == capacity)//如果佇列中還有一個剩餘空間 signalNotFull(); return x; } 複製程式碼
- 定義區域性變數c,用來表示size,初始值是-1。
- 把本類例項的count欄位賦值給臨時變數count。
- 開啟響應中斷的排他鎖takeLock 。
- 如果count==0,說明目前佇列中沒有資料,就阻塞當前執行緒,等待喚醒,直到其他執行緒呼叫了notEmpty的signal方法喚醒了當前執行緒。用while迴圈是為了防止虛假喚醒。
- 進行出隊操作。
- count先賦值給c後,在進行自減操作,這裡需要注意是先賦值,後自減。
- 如果c>1,也就是size>1,結合上面的先賦值,後自減,可知如果滿足條件,說明在出隊之前,佇列中至少有兩個元素,則呼叫notEmpty的signal方法,喚醒因為呼叫notEmpty的await方法而被阻塞的執行緒。
- 釋放排他鎖takeLock 。
- 如果執行出隊後,佇列中有且只有一個剩餘空間,換個說法,就是執行出隊操作前,佇列是滿的,則呼叫signalNotFull方法。
我們再來看下signalNotFull方法:
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } 複製程式碼
- 開啟排他鎖,注意這裡的排他鎖是putLock 。
- 呼叫notFull的signal方法,喚醒因為呼叫notFull的await方法而被阻塞的執行緒。
- 釋放排他鎖putLock 。
poll
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } 複製程式碼
相比take方法,最大的區別就如果佇列為空,執行take方法會阻塞當前執行緒,直到被喚醒,而poll方法,直接返回null。
peek
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } } 複製程式碼
peek方法,只是拿到頭節點的值,但是不會移除該節點。
dequeue
private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } 複製程式碼
沒什麼好說的,就是彈出元素,並且移除彈出的元素。
小總結
至此take/poll/peek的核心原始碼已經分析完畢,我們來做一個小總結,take/poll/peek都是獲得頭節點值的方法,不過他們之間還是有所區別的:
- take:當佇列為空,會阻塞當前執行緒,直到被喚醒。會進行出隊操作,移除獲得的節點。
- poll:當佇列為空,直接返回null。會進行出隊操作,移除獲得的節點。
- put:當佇列為空,直接返回null。不會移除節點。
LinkedBlockingQueue的核心原始碼分析到這裡完畢了,謝謝大家。