1. 程式人生 > >java多執行緒:執行緒池原理、阻塞佇列

java多執行緒:執行緒池原理、阻塞佇列


# 一、執行緒池定義和使用
jdk 1.5 之後就引入了執行緒池。 ## 1.1 定義
從上面的空間切換看得出來,執行緒是稀缺資源,它的建立與銷燬是一個相對偏重且耗資源的操作,而Java執行緒依賴於核心執行緒,建立執行緒需要進行作業系統狀態切換。為避免資源過度消耗需要設法重用執行緒執行多個任務。執行緒池就是一個執行緒快取,負責對執行緒進行統一分配、調優與監控。(資料庫連線池也是一樣的道理) **什麼時候使用執行緒池?** 單個任務處理時間比較短;需要處理的任務數量很大。 **執行緒池優勢?** * 重用存在的執行緒,減少執行緒建立、消亡的開銷,提高效能、提高響應速度。 * 當任務到達時,任務可以不需要等到執行緒建立就能立即執行。 * 提高執行緒的可管理性,可統一分配,調優和監控。 ## 1.2 執行緒池在 jdk 已有的實現
* 在 juc 包下,有一個介面:Executor : * Executor 又有兩個子介面:ExecutorService 和 ScheduledExecutorService,常用的介面是 ExecutorService。 * 同時常用的執行緒池的工具類叫 Executors。 例如: ```java ExecutorService service = Executors.newCachedThreadPool(); ``` **Executor 框架**雖然提供瞭如 newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()、newScheduledThreadPool() 等建立執行緒池的方法,但都有其侷限性,不夠靈活。 上面的幾種方式點進去會發現,都是用 ThreadPoolExecutor 進行建立的: 1. newSingleThreadExecutor 字面意思 **簡單執行緒執行器**。
2. newFixedThreadPool 字面意思**固定的執行緒池**,傳參就是執行緒固定數目,適用於執行長期任務的場景。
3. newCachedThreadPool 字面意思 **快取執行緒池**,核心執行緒0,最大執行緒非常大,動態建立的特點。
4. newScheduledThreadPool 字面意思 **時間安排執行緒池**,指定核心執行緒數。
5. newSingleThreadScheduledExecutor 字面意思 **單執行緒安排執行器**,也就是基於只有一個核心執行緒的執行器之外,又可以擴充套件。其中又用 DelegatedExecutorService 委託執行器服務進行了包裝。
可以看到,上面直接用 Executors 工具類預設的一些實現 new 出來的執行緒池都是用的 ThreadPoolExecutor 執行緒執行器這個類進行構造的,不過引數不同,導致了效果的側重點不同。 因此,自己建立執行緒池推薦的方法就是,直接使用 ThreadPoolExecutor 進行個性化的建立:
構造方法種的引數有 7 個: * corePoolSize:執行緒池維護執行緒的最少數量 (core : **核心**) * maximumPoolSize:執行緒池維護執行緒的**最大**數量,顯然必須>=1 * keepAliveTime:執行緒池維護的**多餘的執行緒**所允許的**空閒時間**,最長可以空閒多久,時間到了,如果超過 corePoolSize 的執行緒一直空閒,他們就會被銷燬。 * unit:執行緒池維護執行緒所允許的空閒時間的單位 * workQueue:執行緒池所使用的**緩衝佇列**,已經提交但是沒有執行的任務會放進這裡 * threadFactory:生成執行緒池種工作執行緒的執行緒**工廠**,一般使用預設 * handler:執行緒池對**拒絕**任務的處理**策略**,當佇列滿且工作執行緒已經達到maximumPoolSize。 阿里的 java 開發手冊,**強制要求,通過 ThreadPoolExecutor 來自定義**,不能使用內建的,避免資源耗盡。這個很好理解,1 的型別就只有一個核心執行緒和最大現場,2 沒有擴充套件性,3、4、5的最大執行緒數太大,記憶體會爆炸。 ## 1.3 執行緒池使用方法
這裡我們用固定執行緒池來測試,傳入核心執行緒數為 5,最大數量自然就也是 5, ```java public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(5); try { //模擬10個顧客辦理業務 for (int i = 0; i < 10; i++){ //execute 執行方法,傳入引數為實現了 Runnable 介面的類 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"號執行緒辦理業務"); }); } } catch (Exception e){ e.printStackTrace(); } finally { threadPool.shutdown(); } } ``` 其中,execute 方法就是將**任務提交**的方法,我們用 lambda 表示式給 execute 方法傳入了引數,實際上相當於一個完整的實現了 Runnable 介面的類。 執行結果:
可以看到,我們迴圈了 10 次,執行任務,但是執行緒只用到了 1-5 ,其中有多次**複用**。 再比如,我們按照各種型別的執行緒池,自己定義一個執行緒池,核心執行緒數 2, 最大執行緒數 5,阻塞佇列長度為 3: ```java public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 2L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); try { //模擬10個顧客辦理業務 for (int i = 0; i < 10; i++){ //execute 執行方法,傳入引數為實現了 Runnable 介面的類 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"號執行緒辦理業務"); }); } } catch (Exception e){ e.printStackTrace(); } finally { threadPool.shutdown(); } } ``` 同樣 10 個執行緒,執行起來:
可以看到,執行了 8 個任務後,就丟擲了異常,說明執行了**拒絕策略**。 > 上面兩個示例,我們的任務本身都是沒有返回值的,如果建立的任務本身需要有返回值就需要實現 Callable 介面,然後搭配FutureTask 來傳入任務,那麼執行緒池就應該呼叫 submit 方法而不是 execute。
# 二、執行緒池底層原理
## 2.1 執行緒池執行邏輯
處理的流程核心就 execute() 方法,他接收一個實現了 Runnable 介面的任務,決定對這個任務的處理策略。
下圖是一個比較形象的策略流程:
可能的情況有四種,也就是圖中的1234: 1. 如果執行緒池中的執行緒數量少於corePoolSize,就建立新的**核心執行緒**來執行新新增的任務 2. 如果執行緒池中的執行緒數量大於等於corePoolSize,但佇列workQueue未滿,則將新新增的任務放到**佇列**workQueue中 3. 如果執行緒池中的執行緒數量大於等於corePoolSize,且佇列workQueue已滿,但執行緒池中的執行緒數量小於maximumPoolSize,則會建立新的**非核心執行緒**來處理被新增的任務 4. 如果執行緒池中的執行緒數量等於了maximumPoolSize,就用RejectedExecutionHandler來**執行拒絕策略**。會丟擲異常,一般的拒絕策略是RejectedExecutionException > 注意,執行的順序,在 java 裡有一個不合理的地方: 在池裡安排任務的時候,我們的核心執行緒,佇列,非核心執行緒裡面排的任務順序應該是 1 2 3; > 但是真正實現上,如果三個都滿了,開始執行的時候,依次執行的順序卻是 核心執行緒,非核心執行緒,佇列。也就是執行順序會變成 1 3 2 ## 2.2 拒絕策略
有些時候,我們並不希望拒絕策略是直接丟擲異常,那麼 jdk 裡面提供的預設拒絕策略有 4 種,他們體現在程式碼中就是 ThreadPoolExecutor 的四個靜態內部類:
### 2.2.1 CallerRunsPolicy:呼叫者執行策略。 這種策略不會拋棄任務,也不丟擲異常,而是將某些任務回退給呼叫者,從而降低新任務的流量。
實現非常簡單,那就是如果說 e 這個執行緒池已經 shutdown 了,那麼就什麼也不幹,也就是這個任務直接丟了;否則,r.run() ,相當於呼叫這個方法的執行緒裡直接執行了這個 Runnable 任務。 此時我們可以把 1.3 裡的程式碼修改一下,只修改策略為 CallerRunsPolicy:
可以看到,有些任務會在 main 執行緒裡處理。 ### 2.2.2 AbortPolicy:終止策略。 拋異常。前面已經試過了,這個是預設的拒絕策略。
### 2.2.3 DiscardPolicy:丟棄任務。 可以看到,原始碼裡就是是什麼也不做。如果場景中允許任務丟失,這個是最好的策略。
### 2.2.4 DiscardOldestPolicy:拋棄佇列中等待最久的任務。 拋棄佇列中等待最久的任務,然後把當前的任務加入佇列中,嘗試再次提交當前任務。 原始碼裡也就是利用佇列操作,進行一次出隊操作,然後重新呼叫 execute 方法。
## 2.3 執行緒池的五種狀態
和**一個正常的執行緒的生命週期**區別開,這個是**執行緒池裡執行緒**的狀態。 1. Running,能接受新任務以及處理已新增的任務; 2. Shutdown,不接受新任務,可以處理已經新增的任務,也就是不能再呼叫execute或者submit了; 3. Stop,不接受新任務,不處理已經新增的任務,並且中斷正在處理的任務; 4. Tidying,所有的任務已經終止,CTL記錄的任務數量為0,CTL負責記錄執行緒池的執行狀態與活動執行緒數量; 5. Terminated,執行緒池徹底終止,則執行緒池轉變為terminated的狀態。
如圖所示,從running狀態轉換為 shutdown,呼叫 shutdown()方法;如果呼叫shutdownNow()方法,就直接會變成stop。 terminated()是鉤子函式,預設是什麼也不做的,我們可以重寫,然後決定結束之前要做一些別的處理邏輯。這個鉤子函式,就是模板模式的方法。
# 三、阻塞佇列
執行緒池裡的 BlockingQueue,**阻塞佇列**,事實上在消費者生產者問題裡的管程法實現,我們的策略也是類似阻塞佇列的,用它來做一個快取池的作用。 **阻塞佇列**:任意時刻,不管併發有多高,永遠保證**只有一個執行緒能夠進行佇列的入隊或出隊操作**。也就意味著他是能夠保證執行緒安全的。 另外,阻塞佇列分為有界和無界佇列,理論上來說一個是佇列的size有固定,另一個是無界的。對於有界佇列來說,如果佇列存滿,只能出隊了,入隊操作就只能阻塞。 在 juc 包裡,阻塞佇列的實現有很多: 1. **ArrayBlockingQueue**:有界阻塞佇列; 2. **LinkedBlockingQueue**:連結串列結構(大小預設值為Integer.MAX_VALUE)的阻塞佇列; 3. PriorityBlockingQueue:支援優先順序排序的無界阻塞佇列; 4. DelayQueue:使用優先順序佇列實現的延遲無界阻塞佇列; 5. SynchronousQueue:不儲存元素的阻塞佇列,相當於只有一個元素; 6. LinkedTransferQueue:連結串列組成的無界阻塞佇列; 7. LinkedBlockingDeque:連結串列組成的雙向阻塞佇列。 對於 BlockingQueue 來說,核心操作主要有幾類:插入、刪除、查詢。
其中的四種異常策略: * **拋異常**:如果阻塞佇列滿,再往佇列裡 add 插入元素會拋 IllegalStateException:Queue full,如果阻塞佇列空,再 remove 就會拋 NoSuchElementException。 * **特殊值**:offer 方法:成功 true,失敗 false,poll 方法,成功就返回元素,沒有就返回 null。 * **阻塞**:阻塞佇列滿的時候,生產者執行緒繼續 put 元素,佇列就會阻塞直到可以 put 資料或者響應中斷然後退出,阻塞佇列空的時候,消費者執行緒繼續 take 元素,佇列就會一直阻塞直到有元素可以 take。 * **超時退出**:阻塞佇列滿的時候,會阻塞生產者執行緒且超時退出,空的時候會阻塞消費者執行緒且超時退出。 那麼使用的時候,增刪的方法按對應的同一組使用比較合理。(其實這個策略的設計對應的在單執行緒集合裡也有,那就是Deque介面的實現類 LinkedList 使用的時候,不同的增刪方法策略