1. 程式人生 > >二、執行緒安全阻塞佇列 BlockingQueue 入門

二、執行緒安全阻塞佇列 BlockingQueue 入門

一、BlockingQueue繼承關係

java.util.concurrent 包裡的 BlockingQueue是一個介面, 繼承Queue介面,Queue介面繼承 Collection

BlockingQueue --> Queue –-> Collection

圖:

佇列的特點是:先進先出(FIFO—first in first out)

二、BlockingQueue的常用方法

往佇列中新增元素:  offer()、add()、 put()
從佇列中取出或者刪除元素: remove(),、peek()、element() 、 poll()、take()

每個方法的說明如下:

// 1.往佇列中新增元素:
offer()方法:往佇列新增元素如果佇列已滿直接返回false,佇列未滿則直接插入並返回true;

add()方法:對offer()方法的簡單封裝.如果佇列已滿,丟擲異常new illegalStateException("Queue full");

put()方法:往佇列裡插入元素,如果佇列已經滿,則會一直等待直到佇列中出現空位插入新元素,或者執行緒被中斷丟擲異常.

// 2.從佇列中取出或者刪除元素:
remove()方法:直接刪除隊頭的元素:

peek()方法:直接取出隊頭的元素,並不刪除.

element()方法:對peek方法進行簡單封裝,如果隊頭元素存在則取出並不刪除,如果不存在丟擲異常NoSuchElementException()

poll()方法:取出並刪除隊頭的元素,當佇列為空,返回null;

take()方法:取出並刪除隊頭的元素,當佇列為空,則會一直等待直到佇列有新元素可以取出,或者執行緒被中斷丟擲異常

offer()方法一般跟poll()方法相對應;
put()方法一般跟take()方法相對應;
日常開發過程中offer()與pool()方法用的相對比較頻繁。

三、BlockingQueue實現類

佇列 有界性 資料結構
ArrayBlockingQueue bounded(有界) 加鎖 arrayList
LinkedBlockingQueue optionally-bounded 加鎖 linkedList
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap
SynchronousQueue bounded 加鎖
LinkedTransferQueue unbounded 加鎖 heap
LinkedBlockingDeque unbounded 無鎖 heap

四、BlockingQueue實現類詳解

1 陣列阻塞佇列 ArrayBlockingQueue

一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是在佇列中存在時間最長的元素。佇列的尾部 是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列獲取操作則是從佇列頭部開始獲得元素。

這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致操作受阻塞;試圖從空佇列中提取元素將導致類似阻塞。

此類支援對等待的生產者執行緒和使用者執行緒進行排序的可選公平策略。預設情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設定為 true 而構造的佇列允許按照 FIFO 順序訪問執行緒。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”

BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
String string = queue.take();

2 鏈阻塞佇列 LinkedBlockingQueue

LinkedBlockingQueue 類實現了 BlockingQueue 介面。

LinkedBlockingQueue 內部以一個鏈式結構(連結節點)對其元素進行儲存。如果需要的話,這一鏈式結構可以選擇一個上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。

LinkedBlockingQueue 內部以 FIFO(先進先出)的順序對元素進行儲存。佇列中的頭元素在所有元素之中是放入時間最久的那個,而尾元素則是最短的那個。

BlockingQueue unbounded = new LinkedBlockingQueue();
BlockingQueue bounded   = new LinkedBlockingQueue(1024);
bounded.put("Value");
String value = bounded.take();
System.out.println(value);
System.out.println(unbounded.remainingCapacity()==Integer.MAX_VALUE);//true

3 延遲佇列DelayQueue

Delayed 元素的一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部 是延遲期滿後儲存時間最長的 Delayed 元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回 null。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於等於 0 的值時,將發生到期。即使無法使用 take 或 poll 移除未到期的元素,也不會將這些元素作為正常元素對待。例如,size 方法同時返回到期和未到期元素的計數。此佇列不允許使用 null 元素

4 具有優先順序的阻塞佇列 PriorityBlockingQueue

PriorityBlockingQueue 類實現了 BlockingQueue 介面。

一個無界阻塞佇列,它使用與類 PriorityQueue 相同的順序規則,並且提供了阻塞獲取操作。雖然此佇列邏輯上是無界的,但是資源被耗盡時試圖執行 add 操作也將失敗(導致OutOfMemoryError)。此類不允許使用 null 元素。依賴自然順序的優先順序佇列也不允許插入不可比較的物件(這樣做會導致丟擲 ClassCastException)。

此類及其迭代器可以實現 Collection 和 Iterator 介面的所有可選 方法。iterator() 方法中提供的迭代器並不 保證以特定的順序遍歷 PriorityBlockingQueue 的元素。如果需要有序地進行遍歷,則應考慮使用 Arrays.sort(pq.toArray())。此外,可以使用方法 drainTo 按優先順序順序移除 全部或部分元素,並將它們放在另一個 collection 中。

在此類上進行的操作不保證具有同等優先順序的元素的順序。如果需要實施某一排序,那麼可以定義自定義類或者比較器,比較器可使用修改鍵斷開主優先順序值之間的聯絡。例如,以下是應用先進先出 (first-in-first-out) 規則斷開可比較元素之間聯絡的一個類。要使用該類,則需要插入一個新的 FIFOEntry(anEntry) 來替換普通的條目物件。

5 同步佇列 SynchronousQueue

SynchronousQueue 類實現了 BlockingQueue 介面。

SynchronousQueue 是一個特殊的佇列,它的內部同時只能夠容納單個元素。如果該佇列已有一元素的話,試圖向佇列中插入一個新元素的執行緒將會阻塞,直到另一個執行緒將該元素從佇列中抽走。同樣,如果該佇列為空,試圖向佇列中抽取一個元素的執行緒將會阻塞,直到另一個執行緒向佇列中插入了一條新的元素。

據此,把這個類稱作一個佇列顯然是誇大其詞了。它更多像是一個匯合點。

6 阻塞雙端佇列 BlockingDeque

java.util.concurrent 包裡的 BlockingDeque 介面表示一個執行緒安放入和提取例項的雙端佇列。本小節我將給你演示如何使用 BlockingDeque。

BlockingDeque 類是一個雙端佇列,在不能夠插入元素時,它將阻塞住試圖插入元素的執行緒;在不能夠抽取元素時,它將阻塞住試圖抽取的執行緒。

deque(雙端佇列) 是 “Double Ended Queue” 的縮寫。因此,雙端佇列是一個你可以從任意一端插入或者抽取元素的佇列。

7 鏈阻塞雙端佇列 LinkedBlockingDeque

一個基於已連結節點的、任選範圍的阻塞雙端佇列。

可選的容量範圍構造方法引數是一種防止過度膨脹的方式。如果未指定容量,那麼容量將等於 Integer.MAX_VALUE。只要插入元素不會使雙端佇列超出容量,每次插入後都將動態地建立連結節點。

大多數操作都以固定時間執行(不計阻塞消耗的時間)。異常包括 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove() 以及批量操作,它們均以線性時間執行。

五、BlockingQueue用法

BlockingQueue 通常用於一個執行緒生產物件,而另外一個執行緒消費這些物件的場景。下圖是對這個原理的闡述:

一個執行緒往裡邊放,另外一個執行緒從裡邊取的一個 BlockingQueue。

一個執行緒將會持續生產新物件並將其插入到佇列之中,直到佇列達到它所能容納的臨界點。也就是說,它是有限的。如果該阻塞佇列到達了其臨界點,負責生產的執行緒將會在往裡邊插入新物件時發生阻塞。它會一直處於阻塞之中,直到負責消費的執行緒從佇列中拿走一個物件。

負責消費的執行緒將會一直從該阻塞佇列中拿出物件。如果消費執行緒嘗試去從一個空的佇列中提取物件的話,這個消費執行緒將會處於阻塞之中,直到一個生產執行緒把一個物件丟進佇列。

六、BlockingQueue Example

生產者
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
	private final BlockingQueue<String> queue;

	Producer(BlockingQueue<String> q) {
		queue = q;
	}

	@Override
	public void run() {
		AtomicInteger i = new AtomicInteger(0);

		try {
			while (true) {
				String str = "" + i;

				queue.put(str);

				System.out.println("向佇列中新增資料:" + str);

				i.incrementAndGet();

				Thread.sleep(3000);

			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

消費者

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
	private final BlockingQueue<String> queue;

	Consumer(BlockingQueue<String> q) {
		queue = q;
	}

	@Override
	public void run() {
		try {
			while(true){
				String str= queue.take();
				
				System.out.println("從佇列中取出資料:" + str);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

測試方法

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestBlockingQueue {
	
	public static void main(String[] args) {
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
		Producer producer = new Producer(queue);
		Consumer consumer = new Consumer(queue);
		
		// 使用有兩個核心執行緒的執行緒池
		ExecutorService executor = Executors.newFixedThreadPool(2);
		
		// 執行生產者執行緒和消費者執行緒
		executor.execute(producer);
		executor.execute(consumer);
	}
	
}

控制檯結果: