Java 進階——多執行緒優化之執行緒池 ThreadPoolExecutor的核心容器阻塞佇列詳解(一)

#引言 多執行緒我想無論是後端開發,還是對於App開發者來說都不會陌生,何況Android強制要求不能在主執行緒中做網路請求,於是乎,在很多初學者或者App的原始碼中會出現會多的new Thread…的方式,這樣的程式碼是不優雅而且存在很多的隱患,假如說在使用者退出App之後,如果執行緒內的工作還未執行完畢此時是無法被回收的,更不必說如果是無限迴圈的執行緒,那麼可能永遠無法回收,永遠佔著記憶體和CPU資源,這是多麼大的浪費,而且專案龐大了還不好維護,所以如果你還是每次new Thread的話,我建議你閱讀下這篇文章或者學習下ThreadPoolExecutor執行緒池,瞭解一哈,在進入執行緒池的使用之前,先介紹下執行緒池框架中的核心容器——阻塞佇列,由於這兩篇文章不僅僅是簡單講解如何使用,會涉及到背後的一些設計思想和原始碼分析,理論知識會比較多,部分內容整理摘自《Java 併發程式設計的藝術》和JDK文件。

#一、執行緒池中的阻塞佇列模型 在我們學習執行緒池種類之前,先得了解一下阻塞佇列模型的相關知識。阻塞佇列是一個支援兩個附加操作的佇列——在佇列為空時,獲取元素的執行緒會等待佇列變為非空;而當佇列滿時,儲存元素的執行緒會等待佇列可用。在多執行緒程式設計過程中為了業務解耦和架構設計,經常會使用併發容器用於儲存多執行緒間的共享資料,這樣不僅可以保證執行緒安全,還可以簡化各個執行緒操作。前面我們在介紹執行緒池的時候說過執行緒池機制其實本質上就是**“生產者–消費者”模型(阻塞佇列常在“生產者–消費者”模型中充當容器角色,生產者是往阻塞佇列裡新增元素的執行緒,消費者是從阻塞佇列裡拿元素的執行緒),那麼

阻塞佇列就相當於是模型中的容器**,當生產者往佇列中新增元素時,如果佇列已經滿了,生產者所在的執行緒就會阻塞,直到消費者取元素時 notify 它; 消費者去佇列中取元素時,如果佇列中是空的,消費者所在的執行緒就會阻塞,直到生產者放入元素 notify 它,而所謂採取不同的策略就是基於不同的阻塞佇列模型的。JDK 1.6一共提供了7種阻塞佇列模式——ArrayBlockingQueueLinkedBlockingQueueDelayQueueSynchronousQueuePriorityBlockingQueue、LinkedTransferQueue、LinkedBlockingDeque,他們統一實現**java.util.concurrent.BlockingQueue< E >**泛型介面,為我們實現不同種類的執行緒池提供了基礎。

  • ArrayBlockingQueue ——一個由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue ——一個由連結串列結構組成的有界阻塞佇列。
  • PriorityBlockingQueue ——一個支援優先順序排序無界阻塞佇列。
  • DelayQueue——一個使用優先順序佇列實現的無界阻塞佇列。
  • SynchronousQueue——一個**“不儲存”**元素的阻塞佇列。
  • LinkedTransferQueue——一個由連結串列結構組成的無界阻塞佇列。
  • LinkedBlockingDeque——一個由連結串列結構組成的雙向阻塞佇列。

#二、BlockingQueue 主要四種的“異常”處理邏輯 通過上面BlockingQueue 主要原始碼部分,可以知道噹噹前佇列操作(新增/讀取)不可達時,BlockingQueue 通常會針對不同種類的操作採取不同的處理措施:

|方法\處理方式| 丟擲異常 |返回特殊值 |一直阻塞| 超時退出 |-----|------|-------------|—| |插入方法| add(e) |offer(e)| put(e)| offer(e,time,unit) |移除方法| remove()| poll() |take()| poll(time,unit) |檢查方法| element()| peek() |不可用 |不可用

  • 當執行**add(), remove(), element()**不可達時,丟擲異常 (指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常;當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 )

  • 當執行offer(), poll(), peek()返回特殊值(插入方法會返回布林值;移除方法則是從佇列裡拿出一個元素,如果沒有則返回null)

  • 當執行**put(), take()**不可達時,一直阻塞當前執行緒,直到操作可以進行 (當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。)

  • 當執行**offer, poll()**不可達時,阻塞一段時間,超時後退出 (當阻塞佇列滿\為空時,佇列會阻塞生產者\消費者執行緒一段時間,如果超過一定的時間,執行緒就會退出)

總之,BlockingQueue 中不允許有 null 元素,因此在 add(), offer(), put() 時如果引數是 null,會丟擲空指標,null 是用來有異常情況時做返回值的。


#三、ArrayBlockingQueue ArrayBlockingQueue 是一個內部使用陣列實現的有界佇列,一旦建立後,容量不可變(因為陣列不可變長)。佇列中的元素按 FIFO 原則進行排序,每次佇列頭部讀取元素,並插入元素到尾部。預設ArrayBlockingQueue 不保證執行緒公平的訪問佇列(公平訪問佇列是指阻塞的所有生產者執行緒或消費者執行緒,當佇列可用時,可以按照阻塞的先後順序訪問佇列,即先阻塞的生產者執行緒,可以先往佇列裡插入元素,先阻塞的消費者執行緒,可以先從佇列裡獲取元素)但是也正因為不公平所以從一定程度上提高了吞吐量(非公平性是對先等待的執行緒是非公平的,當佇列可用時,阻塞的執行緒都可以爭奪訪問佇列的資格,有可能先阻塞的執行緒最後才訪問佇列)。

**注意:只是預設是不保證執行緒公平的訪問佇列並非不能支援,當然也可以通過對應的構造方法new ArrayBlockingQueue(100,true)來構造公平的阻塞佇列

ArrayBlockingQueue的核心操作在此我們只關注佇列的建立插入元素獲取元素操作。 ##1、建立ArrayBlockingQueue 通過構造方法即可建立對應的例項。

方法 說明
ArrayBlockingQueue(int capacity, boolean fair) 建立指定容量的陣列,建立指定現場訪問策略的重入鎖和Condition
ArrayBlockingQueue(int capacity) 預設的建構函式只指定了佇列的容量並設定為非公平的執行緒訪問策略
**ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) ** 建立指定容量的陣列,建立指定現場訪問策略的重入鎖和Condition 並插入指定的元素

##2、向佇列中插入元素 在ArrayBlockingQueue中實現插入元素,有多個方法,本質上還是呼叫enqueue方法。

方法 說明
add(E e) 如果沒有超過佇列的容量,直接在隊尾處插入指定元素e,add(E) 呼叫了父類的方法,而父類裡沒有實現 offer(E),本質上還是呼叫自身的offer(E),如果返回 false 就丟擲異常。
offer(E e) 先申請鎖,拿到之後如果立即將e插入佇列沒有超過最大容量,則呼叫enqueue將e插入佇列尾部,如果佇列已滿返回false
offer(E,long,TimeUnit) 與offer(E e)大體上功能類似,區別在於可以設定等待超時時間,若已超過還不能有位置則返回 false;否則呼叫 enqueue(E),然後返回 true。
enqueue(E x) 如果新增元素後佇列滿了,就修改 putIndex 為 0 ;反之直接新增到陣列佇列尾部並呼叫 notEmpty.signal() 通知喚醒阻塞在獲取元素的執行緒
put() 功能和offer型別,put() 方法可以響應中斷,當佇列滿了,就呼叫 notFull.await() 阻塞等待,等有消費者獲取元素後繼續執行

##3、獲取佇列中的元素 在ArrayBlockingQueue中實現獲取元素,有多個方法,本質上還是呼叫dequeue方法。

方法 說明
E peek() 直接返回陣列中隊尾的元素,並不會刪除元素。如果佇列中沒有元素返回的是 null
E poll() 選申請鎖,拿到鎖之後,如果在佇列中沒有元素時會立即返回 null;如果有元素呼叫 dequeue()返回
E poll(long timeout, TimeUnit unit)(E e) 與offer(E e)大體上功能類似,區別在於可以允許阻塞一段時間,如果在阻塞一段時間還沒有元素進來,就返回 null
take() 與poll(E e)大體上功能類似,take() 方法可以響應中斷,如果佇列中沒有資料會一直阻塞等待,直到中斷或者有元素,有元素時還是呼叫 dequeue() 方法。
E dequeue() 從隊首移除元素(即 takeIndex 位置)移除後會向後移動 takeIndex,如果已經到隊尾,就歸零,其實 ArrayBlockingQueue 是個環形陣列

概括起來ArrayBlockingQueue 使用可重入鎖 ReentrantLock 控制佇列的插入和獲取,兩個 Condition 實現生產者 - 消費者模型。可以看出put和take方法主要是通過condition的通知機制來完成可阻塞式的插入資料和獲取資料

#四、LinkedBlockingQueue ArrayBlockingQueue 是一個使用陣列實現的阻塞佇列,而LinkedBlockingQueue則是使用連結串列實現的有界阻塞佇列,當構造物件時為指定佇列大小時,佇列預設大小為Integer.MAX_VALU,可以通過過載構造方法設定最大值,佇列中的元素按 FIFO 的原則進行排序,吞吐量比ArrayBlockingQueue 要大。

     * Removes a single instanc