1. 程式人生 > >Java併發程式設計之深入執行緒池原理及實現

Java併發程式設計之深入執行緒池原理及實現

Java執行緒池在實際的應用開發中十分廣泛。雖然Java1.5之後在JUC包中提供了內建執行緒池可以拿來就用,但是這之前仍有許多老的應用和系統是需要程式設計師自己開發的。因此,基於執行緒池的需求背景、技術要求瞭解執行緒池原理和實現,一方面可以更為深刻理解Java多執行緒開發,有助於解決業務系統中因為執行緒問題所產生的bug;另一方面也有利於對遺留程式碼的重構。

如果需要先行了解Java併發程式設計的基礎知識,可參考以下隨筆:

執行緒池原理

所謂的執行緒池,跟JDBC連線池、代理池等一樣,屬於一種“池”的設計模式。在設計好的資料結構中存放一定數量的執行緒,並且根據任務需要自動調整執行緒數量的多少,直到峰值。具體說來,執行緒池需要滿足若干條件:

1. 任務佇列:用於快取提交的任務

2. QueueSize:任務佇列存放的Runnable任務例項的數量,需要有限制值防止記憶體溢位。

3. 數量管理:建立執行緒時初始的數量init;執行緒池自動擴充時最大的執行緒數量max;空閒時的活躍執行緒或核心執行緒數量core。三者滿足init<=core<=max

4. 任務拒絕策略:如果執行緒數量達到上限且任務佇列已滿,需要有拒絕策略通知任務提交者,這個在工程實踐中非常重要。

5. 執行緒工廠:用於個性化定製執行緒,如設定守護執行緒、執行緒名稱等。

6. Keepedalive時間:執行緒各個重要引數自動維護的時間間隔。

執行緒池原理圖如下:

執行緒池實現

“模組設計,類圖先行”。明確了執行緒池需要實現的功能之後,就可以畫出執行緒池的草圖了,核心介面及實現類如下:

ThreadPool介面

ThreadPool介面主要定義一個執行緒池的基本屬性,如任務提交、初始容量、最大容量、核心容量等。實現程式碼如下:

 1 public interface ThreadPool {
 2     
 3     //submit tasks to thread pool
 4     void execute(Runnable runnable);
 5     //close pool
 6     void shutdown();
 7     //
get the initial size of pool 8 int getInitSize(); 9 //get the max size of pool 10 int getMaxSize(); 11 //get the core size of pool 12 int getCoreSize(); 13 //get the cache tasks queue of pool 14 int getQueueSize(); 15 //get the active thread volume of pool 16 int getActiveCount(); 17 //check if pool has been shutdown 18 boolean isShutdown(); 19 }

RunnableQueue介面

這個介面的作用與BlockingQueue介面一樣,用於儲存提交的Runnable實現類任務。

1 public interface RunnableQueue {
2     //提交任務到佇列
3     void offer(Runnable runnable);
4     //從佇列中獲取任務
5     Runnable take() throws InterruptedException;
6     //返回佇列中任務數
7     int size();
8 }

ThreadFactory介面

定義了個性化建立執行緒的工廠方法

1 @FunctionalInterface
2 public interface ThreadFactory {
3     
4     Thread createThread(Runnable runnable);
5 
6 }

DenyPolicy介面

定義了執行緒池的拒絕策略,即當任務佇列達到上限時,採取何種措施拒絕。注意介面內定義了內部類作為外圍介面的實現類(該類自動為public和static,像這種巢狀類的實現,可查詢《Java程式設計思想》)。

 1 @FunctionalInterface
 2 public interface DenyPolicy {
 3     
 4     void reject(Runnable runnable, ThreadPool threadPool);
 5     //定義巢狀類作為拒絕策略的實現類
 6     //1.拒絕並丟棄任務
 7     class DiscardDenyPolicy implements DenyPolicy{
 8 
 9         @Override
10         public void reject(Runnable runnable, ThreadPool threadPool) {
11             
12         }    
13     }
14     
15     //2.拒絕並丟擲自定義異常
16     class AbortDenyPolicy implements DenyPolicy{
17 
18         @Override
19         public void reject(Runnable runnable, ThreadPool threadPool) {
20             throw new RunnableDenyException("The runnable " + runnable + " will abort.");
21         }        
22     }
23     
24     //3.拒絕, 使用提交者所線上程來完成執行緒任務.
25     class RunnerDenyPolicy implements DenyPolicy{
26 
27         @Override
28         public void reject(Runnable runnable, ThreadPool threadPool) {
29 
30             if(!threadPool.isShutdown()) {
31                 runnable.run();
32             }
33         }    
34     }
35 }

其實實現了自定義異常類RunnableDenyException:

1 public class RunnableDenyException extends RuntimeException {
2     
3     private static final long serialVersionUID = 112311231231412L;
4 
5     public RunnableDenyException(String message) {
6         super(message);
7     }
8 }

InternalTask實現類

Runnable的實現類,會使用到RunnableQueue,它的作用其實是封裝了一個任務例項,把Runnable任務的run方法封裝到自己的Run方法實現中,並且提供了一個stop方法,用於線上程池銷燬或數量維護時停止當前執行緒。

 1 public class InternalTask implements Runnable {
 2     //組合一個RunnableQueue的引用
 3     private final RunnableQueue runnableQueue;
 4     //使用volatile關鍵字修飾開關變數
 5     private volatile boolean running = true;
 6     
 7     public InternalTask(RunnableQueue runnableQueue) {
 8         this.runnableQueue = runnableQueue;
 9     }
10     @Override
11     public void run() {
12         // if current task match "both running and isInterrupt" are true
13         // continue to take runnable from queue and run
14         while(running && !Thread.currentThread().isInterrupted()) {
15             try {
16                 Runnable task = runnableQueue.take();
17                 task.run();
18             } catch (Exception e) {
19                 running = false;
20                 break;
21             }
22         }
23 
24     }
25     //停止執行緒的開關方法
26     public void stop() {
27         this.running = false;
28     }
29 }

到這裡,一個基本執行緒池的骨架就搭好了,接下來主要是實現各介面,實現具體的方法。

1. 佇列的實現類LinkedRunnableQueue

 1 public class LinkedRunnableQueue implements RunnableQueue {
 2     //設定佇列上限
 3     private final int limit;
 4     //設定拒絕策略的引用
 5     private final DenyPolicy denyPolicy;
 6     //使用LinkedList作為佇列的具體實現類
 7     private final LinkedList<Runnable> runnableList = new LinkedList<>();
 8     //設定執行緒池的引用
 9     private final ThreadPool threadPool;
10     //構造方法時賦初始值
11     public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
12         this.limit = limit;
13         this.denyPolicy = denyPolicy;
14         this.threadPool = threadPool;
15     }
16 
17     @Override
18     public void offer(Runnable runnable) {
19         //使用同步鎖, 確保入隊的執行緒安全
20         synchronized (runnableList) {
21             //當達到佇列上限, 呼叫拒絕策略;否則加入隊尾, 並喚醒阻塞中的執行緒.
22             if(runnableList.size() >= limit) {
23                 denyPolicy.reject(runnable, threadPool);
24             }else {
25                 runnableList.addLast(runnable);
26                 runnableList.notifyAll();
27             }
28         }
29     }
30 
31     @Override
32     public Runnable take() throws InterruptedException {
33         synchronized (runnableList) {
34             
35             while(runnableList.isEmpty()) {
36                 try {
37                     //如果佇列中沒有可執行任務, 執行緒掛起, 進入runnableList關聯的monitor waitset中等待被喚醒
38                     runnableList.wait();
39                 } catch (InterruptedException e) {
40                     //如果被中斷, 需要丟擲異常
41                     throw e;
42                 }
43             }
44             return runnableList.removeFirst();
45         }
46     }
47 
48     @Override
49     public int size() {
50         synchronized (runnableList) {
51             //返回佇列中的任務數量
52             return runnableList.size();
53         }
54     }
55 }

2. 執行緒工廠的實現

 1 public class DefaultThreadFactory implements ThreadFactory {
 2     //定義原子類的Integer作為執行緒組的計數
 3     private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
 4     //定義執行緒組物件
 5     private static final ThreadGroup group = new ThreadGroup("MyThreadPool-"+ GROUP_COUNTER.getAndDecrement());
 6     //定義生產的執行緒計數
 7     private static final AtomicInteger COUNTER = new AtomicInteger(0);
 8 
 9     @Override
10     public Thread createThread(Runnable runnable) {
11         return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement());
12     }
13 }

3. 執行緒池的實現

執行緒池的實現相對比較複雜, 運用了多種設計模式的思想,核心的要點包括:

1. 使用私有內部類的方式來複用Thread類,防止向外暴露Thread的介面;

2. 核心組成部分主要是LinkedList實現的任務佇列和ArrayDeque實現的工作執行緒佇列,構成了主要的儲存主體。

3. 核心的擴容機制需要RunnableQueue + InternalTask + ThreadFactory的結合, 簡單說來就是通過判定任務數是否達到閾值,然後增加工作執行緒的數量。

  1 public class BasicThreadPool implements ThreadPool {
  2     //為了不暴露Thread類的方法, 使用私有內部類WorkThread來繼承Thread類
  3     private WorkThread workThread;
  4     //執行緒池的基本屬性
  5     private final int initSize;
  6     private final int maxSize;
  7     private final int coreSize;
  8     private int activeCount;
  9     //執行緒工廠引用
 10     private final ThreadFactory threadFactory;
 11     //佇列引用
 12     private final RunnableQueue runnableQueue;
 13     //執行緒池銷燬標識
 14     private volatile boolean isShutdown = false;
 15     //工作執行緒的佇列, 使用ArrayDeque實現
 16     private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();
 17     //定義了一個預設的拒絕策略
 18     private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
 19     //定義了一個預設的工廠物件
 20     private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
 21     
 22     private final long keepAliveTime;
 23     private final TimeUnit timeUnit;
 24     //預設的構造器, 只需要傳入初始容量, 最大容量, 核心容量和佇列上限
 25     public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {
 26         this(initSize, maxSize, coreSize, queueSize, DEFAULT_THREAD_FACTORY, 
 27                 DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
 28     }
 29     //完整構造器
 30     public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize, ThreadFactory threadFactory,
 31             DenyPolicy denyPolicy,long keepAliveTime, TimeUnit timeUnit) {
 32         this.workThread = new WorkThread();
 33         this.initSize = initSize;
 34         this.maxSize = maxSize;
 35         this.coreSize = coreSize;
 36         this.threadFactory = threadFactory;
 37         this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
 38         this.keepAliveTime = keepAliveTime;
 39         this.timeUnit = timeUnit;
 40         this.init();
 41     }
 42     //執行緒池的初始化方法, 在構造器中被呼叫, 用於啟動工作執行緒
 43     private void init() {
 44         workThread.start();
 45         for(int i = 0; i < initSize; i++) {
 46             newThread();
 47         }
 48     }
 49     //封裝了工作執行緒的啟動方法: 
 50     //1. 使用InternalTask封裝RunnableQueue物件
 51     //2. 通過工廠方法制造工作執行緒並啟動
 52     //3. 工作執行緒入隊, 工作執行緒佇列計數器+1
 53     private void newThread() {
 54         InternalTask internalTask = new InternalTask(runnableQueue);
 55         Thread thread = this.threadFactory.createThread(internalTask);
 56         ThreadTask threadTask = new ThreadTask(thread, internalTask);
 57         threadQueue.offer(threadTask);
 58         this.activeCount++;
 59         thread.start();
 60     }
 61     //工作執行緒出隊的方法
 62     private void removeThread() {
 63         ThreadTask threadTask = threadQueue.remove();
 64         threadTask.internalTask.stop();
 65         this.activeCount--;
 66     }
 67     //核心:通過內部類繼承Thread方法, 設計了自動擴容的機制.
 68     //為了防止過快增加到Max容量, 使用continue來退出迴圈
 69     private class WorkThread extends Thread{
 70         @Override
 71         public void run() {
 72             while(!isShutdown && !isInterrupted()) {
 73                 try {
 74                     timeUnit.sleep(keepAliveTime);
 75                 } catch (InterruptedException e) {
 76                     isShutdown = true;
 77                     break;
 78                 }
 79                 synchronized (this) {
 80                     if(isShutdown) {
 81                         break;
 82                     }
 83                     if(runnableQueue.size() > 0 && activeCount < coreSize) {
 84                         for(int i = initSize; i<coreSize;i++) {
 85                             newThread();
 86                         }
 87                         continue;
 88                     }
 89                     if(runnableQueue.size() > 0 && activeCount < maxSize) {
 90                         for(int i = coreSize; i<maxSize;i++) {
 91                             newThread();
 92                         }
 93                     }
 94                     if(runnableQueue.size()==0 && activeCount > coreSize) {
 95                         for(int i = coreSize; i < activeCount; i++) {
 96                             removeThread();
 97                         }
 98                     }
 99                     
100                 }
101             }
102         }
103     }
104 
105     @Override
106     public void execute(Runnable runnable) {
107         //如果執行緒池已經銷燬, 將丟擲異常
108         if(this.isShutdown) {
109             throw new IllegalStateException("the thread pool is destoried");
110         }
111         this.runnableQueue.offer(runnable);    
112     }
113 
114     @Override
115     public void shutdown() {
116         synchronized(this) {
117             //防止重複銷燬
118             if(isShutdown) {
119                 return;
120             }
121             //重置關閉標識
122             isShutdown = true;
123             //關閉任務工作執行緒
124             threadQueue.forEach(threadTask -> {
125                 threadTask.internalTask.stop();
126                 threadTask.thread.interrupt();
127             });
128             //關閉執行緒池的工作執行緒
129             this.workThread.interrupt();
130         }
131     }
132 
133     @Override
134     public int getInitSize() {
135         if(isShutdown) {
136             throw new IllegalStateException("The thread pool is destroy");
137         }
138         return this.initSize;
139     }
140 
141     @Override
142     public int getMaxSize() {
143         if(isShutdown) {
144             throw new IllegalStateException("The thread pool is destroy");
145         }
146         return this.maxSize;
147     }
148 
149     @Override
150     public int getCoreSize() {
151         if(isShutdown) {
152             throw new IllegalStateException("The thread pool is destroy");
153         }
154         return this.coreSize;
155     }
156 
157     @Override
158     public int getQueueSize() {
159         if(isShutdown) {
160             throw new IllegalStateException("The thread pool is destroy");
161         }
162         return runnableQueue.size();
163     }
164 
165     @Override
166     public int getActiveCount() {
167         synchronized(this) {
168             return this.activeCount;
169         }
170     }
171 
172     @Override
173     public boolean isShutdown() {        
174         return this.isShutdown;
175     }
176 }

執行緒池的測試

編寫一個簡單的測試類,同時啟動20個任務,測試執行緒池的活動狀態:

 1 public class ThreadPoolTest {
 2 
 3     public static void main(String[] args) throws InterruptedException {
 4         
 5         final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);
 6         
 7         for(int i = 0; i < 20; i++) {
 8             threadPool.execute(() -> {
 9                 try {
10                     TimeUnit.SECONDS.sleep(10);
11                     System.out.println(Thread.currentThread().getName() + "is Running and done");
12                 } catch (InterruptedException e) {
13                     e.printStackTrace();
14                 }
15             });
16         }
17         while(true) {
18             System.out.println("getActiveCount: " + threadPool.getActiveCount());
19             System.out.println("getQueueSize: " + threadPool.getQueueSize());
20             System.out.println("getCoreSize: " + threadPool.getCoreSize());
21             System.out.println("getMaxSize: " + threadPool.getMaxSize());
22             System.out.println("================================================");
23             TimeUnit.SECONDS.sleep(5);            
24         }
25     }
26 }

輸出結果如下

thread-pool--1is Running and done
thread-pool-0is Running and done
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
================================================
getActiveCount: 4
getQueueSize: 14
getCoreSize: 4
getMaxSize: 6
================================================
thread-pool--3is Running and done
thread-pool--2is Running and done
thread-pool--1is Running and done
thread-pool-0is Running and done
getActiveCount: 6
getQueueSize: 8
getCoreSize: 4
getMaxSize: 6