1. 程式人生 > >使用 LinkedBlockingQueue 實現簡易版執行緒池

使用 LinkedBlockingQueue 實現簡易版執行緒池

前一陣子在做聯絡人的匯入功能,使用POI元件解析Excel檔案後獲取到聯絡人列表,校驗之後批量匯入。單從技術層面來說,匯入操作通常情況下是一個比較耗時的操作,而且如果聯絡人達到幾萬、幾十萬級別,必須拆分成為子任務來執行。綜上,可以使用執行緒池來解決問題。技術選型上,沒有采用已有的 ThreadPoolExecutor 框架,而使用了自制的簡易版執行緒池。該簡易版的執行緒池,其實也是一個簡易版的【生產者-消費者】模型,任務的加入就像是生產的過程,任務的處理就像是消費的過程。我們在這裡不去討論方案的合理性,只是從技術層面總結一下在實現簡易版執行緒池的過程中,我所學到的知識。   程式碼放在Github上,分享一下:https://github.com/Julius-Liu/threadpool

 

一、執行緒池設計

我們首先使用陣列 ArrayList 來作為執行緒池的儲存結構,例如陣列大小為10就意味著這是一個大小為10的執行緒池。然後我們使用 LinkedBlockingQueue(鏈式阻塞佇列)來存放執行緒的引數。示意圖如下:

 

當執行緒池裡的執行緒初始化完成後,我們希望執行緒都處於【飢餓】狀態,隨時等待引數傳入,然後執行。所以,此時執行緒應該處於阻塞狀態,如下圖所示:   當我們將一個執行任務(一個引數)交給執行緒池以後,執行緒池會安排一個執行緒接收引數,這個執行緒會進入執行狀態。執行緒執行完以後,執行緒又會因為引數佇列為空而進入阻塞狀態。某執行緒的執行狀態如下圖所示,執行完的阻塞態,如上圖所示。

 

假設執行緒池中有3個執行緒,我們連續扔了3個引數給執行緒池,執行緒池會輪詢獲取執行緒,將引數塞給他們,然後這些執行緒會進入執行狀態。執行完成後迴歸阻塞狀態。如下圖所示:

 

如下圖所示,假設執行緒池中只有3個執行緒,我們連續發8個引數給執行緒池,那麼池會輪流分配引數。執行緒在收到引數後就會執行。“消耗”掉一個引數後,會繼續消耗下一個引數,直到引數列表為空為止。

 

二、為什麼使用 LinkedBlockingQueue

1. BlockingQueue

我們必須先來說說為什麼使用阻塞佇列 BlockingQueue。BlockingQueue 佇列為空時,嘗試獲取隊頭元素的操作會阻塞,一直等到佇列中有元素時再返回。這個阻塞的特性,正是我們需要的,我們可以讓執行緒一直等待元素插入,一旦插入立即執行。BlockingQueue 也支援在新增元素時,如果佇列已滿,那麼等到佇列可以放入新元素時再放入。如此一來,我們交給執行緒池的任務就不會丟失,哪怕超過了佇列的容量。   所以我們定下方案,採用阻塞佇列來作為資料結構,然後我們來調研阻塞佇列常用的5種實現,看看選擇哪種實現來完成執行緒池。  

2. ArrayBlockingQueue

ArrayBlockingQueue 是一個用陣列實現的有界阻塞佇列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法為新增和刪除的阻塞方法。可以說 ArrayBlockingQueue 是 阻塞佇列的最直觀的實現。  

3. DelayQueue

DelayQueue是一個無界阻塞佇列,延遲佇列提供了在指定時間才能獲取佇列元素的功能,佇列頭元素是最接近過期的元素。沒有過期元素的話,使用poll()方法會返回null值,超時判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小於等於0來判斷。   DelayQueue阻塞佇列在我們系統開發中也常常會用到,例如快取系統的設計。快取中的物件,超過了空閒時間,需要從快取中移出;例如任務排程系統,需要準確的把握任務的執行時間。我們可能需要通過執行緒處理很多時間上要求很嚴格的資料,如果使用普通的執行緒,我們就需要遍歷所有的物件,一個個檢檢視資料是否過期。首先這樣在執行上的效率不會太高,其次就是這種設計的風格也大大的影響了資料的精度。一個需要12:00點執行的任務可能12:01 才執行,這樣對資料要求很高的系統有更大的弊端。使用 DelayQueue 可以做到精準觸發。   由上可知,延遲佇列不是我們需要的阻塞佇列實現。  

4. LinkedBlockingQueue

LinkedBlockingQueue是一個由連結串列實現的有界佇列阻塞佇列,但大小預設值為Integer.MAX_VALUE,也可以在初始化的時候指定 capacity。和 ArrayBlockingQueue 一樣,其中put方法和take方法為新增和刪除的阻塞方法。  

5. PriorityBlockingQueue

優先順序阻塞佇列通過使用堆這種資料結構實現將佇列中的元素按照某種排序規則進行排序,從而改變先進先出的佇列順序,提供開發者改變佇列中元素的順序的能力。佇列中的元素必須是可比較的,即實現Comparable介面,或者在構建函式時提供可對佇列元素進行比較的Comparator物件。不可以放null,會報空指標異常,也不可放置無法比較的元素;add方法新增元素時,是自下而上的調整堆,取出元素時,是自上而下的調整堆順序。   我們放入引數佇列中的引數都是平級的,不涉及優先順序,因此我們不考慮優先順序阻塞佇列。  

6. SynchronousQueue

同步佇列實際上不是一個真正的佇列,因為它不會為佇列中元素維護儲存空間。與其他佇列不同的是,它維護一組執行緒,這些執行緒在等待著把元素加入或移出佇列。同步佇列是輕量級的,不具有任何內部容量,我們可以用來線上程間安全的交換單一元素。 因為同步佇列沒有儲存功能,因此put和take會一直阻塞,直到有另一個執行緒已經準備好參與到交付過程中。僅當有足夠多的消費者,並且總是有一個消費者準備好獲取交付的工作時,才適合使用同步佇列。   應用場景,我們來看一下Java併發包裡的 newCachedThreadPool 方法:
 1 package java.util.concurrent;
 2 
 3 /**
 4  * 帶有快取的執行緒池
 5  */
 6 public static ExecutorService newCachedThreadPool() {
 7     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 8                                   60L, TimeUnit.SECONDS,
 9                                   new SynchronousQueue<Runnable>());
10 }

 

Executors.newCachedThreadPool() 方法返回的 ThreadPoolExecutor 例項,其內部的阻塞佇列使用的就是同步佇列。由於ThreadPoolExecutor內部實現任務提交的時候呼叫的是工作佇列的非阻塞式入佇列方法(offer方法),因此,在使用同步佇列作為工作佇列的前提下,客戶端程式碼向執行緒池提交任務時,而執行緒池中又沒有空閒的執行緒能夠從同步佇列佇列例項中取一個任務,那麼相應的offer方法呼叫就會失敗(即任務沒有被存入工作佇列)。此時,ThreadPoolExecutor會新建一個新的工作者執行緒用於對這個入佇列失敗的任務進行處理(假設此時執行緒池的大小還未達到其最大執行緒池大小)。   如上所述,同步佇列沒有內部容量來存放參數,因此我們不選擇同步佇列。  

7. 阻塞佇列選擇

研究了阻塞佇列的5中實現以後,候選者就在 ArrayBlockingQueue 和 LinkedBlockingQueue 兩者中。其實要實現本文的簡易版執行緒池,使用陣列阻塞佇列和連結阻塞佇列都可以,如果你要考慮一些極端情況下的效能問題,那麼透徹的研究兩者的使用場景就非常有必要。陣列阻塞佇列和連結阻塞佇列的成員變數和方法都很相似,相同點我們就先不說了。下面我們來看看兩者的不同點:
  1. 佇列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE)。對於後者而言,當新增速度大於移除速度時,在無界的情況下,可能會造成記憶體溢位等問題。
  2. 資料儲存容器不同,ArrayBlockingQueue採用的是陣列作為資料儲存容器,而LinkedBlockingQueue採用的則是以Node節點作為連線物件的連結串列。
  3. 由於ArrayBlockingQueue採用的是陣列的儲存容器,因此在插入或刪除元素時不會產生或銷燬任何額外的物件例項,而LinkedBlockingQueue則會生成一個額外的Node物件。這可能在長時間內需要高效併發地處理大批量資料的時,對於GC可能存在較大影響。
  4. 實現佇列新增或移除的鎖不一樣,ArrayBlockingQueue實現的佇列中的鎖是沒有分離的,即新增操作和移除操作採用的同一個ReentrantLock鎖,而LinkedBlockingQueue實現的佇列中的鎖是分離的,其新增採用的是putLock,移除採用的則是takeLock,這樣能大大提高佇列的吞吐量,也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
 

三、LinkedBlockingQueue 底層方法

我們來調研一下 LinkedBlockingQueue,看看哪些變數和方法可以使用。 先來看一下 LinkedBlockingQueue 的資料結構,有一個直觀的瞭解: