Java併發程式設計之深入執行緒池原理及實現
Java執行緒池在實際的應用開發中十分廣泛。雖然Java1.5之後在JUC包中提供了內建執行緒池可以拿來就用,但是這之前仍有許多老的應用和系統是需要程式設計師自己開發的。因此,基於執行緒池的需求背景、技術要求瞭解執行緒池原理和實現,一方面可以更為深刻理解Java多執行緒開發,有助於解決業務系統中因為執行緒問題所產生的bug;另一方面也有利於對遺留程式碼的重構。
如果需要先行了解Java併發程式設計的基礎知識,可參考以下隨筆:
執行緒池原理
所謂的執行緒池,跟JDBC連線池、代理池等一樣,屬於一種“池”的設計模式。在設計好的資料結構中存放一定數量的執行緒,並且根據任務需要自動調整執行緒數量的多少,直到峰值。具體說來,執行緒池需要滿足若干條件:
1. 任務佇列:用於快取提交的任務
2. QueueSize:任務佇列存放的Runnable任務例項的數量,需要有限制值防止記憶體溢位。
3. 數量管理:建立執行緒時初始的數量init;執行緒池自動擴充時最大的執行緒數量max;空閒時的活躍執行緒或核心執行緒數量core。三者滿足init<=core<=max
4. 任務拒絕策略:如果執行緒數量達到上限且任務佇列已滿,需要有拒絕策略通知任務提交者,這個在工程實踐中非常重要。
5. 執行緒工廠:用於個性化定製執行緒,如設定守護執行緒、執行緒名稱等。
6. Keepedalive時間:執行緒各個重要引數自動維護的時間間隔。
執行緒池原理圖如下:
執行緒池實現
“模組設計,類圖先行”。明確了執行緒池需要實現的功能之後,就可以畫出執行緒池的草圖了,核心介面及實現類如下:
ThreadPool介面
ThreadPool介面主要定義一個執行緒池的基本屬性,如任務提交、初始容量、最大容量、核心容量等。實現程式碼如下:
1 public interface ThreadPool { 2 3 //submit tasks to thread pool 4 void execute(Runnable runnable); 5 //close pool 6 void shutdown(); 7 //get the initial size of pool 8 int getInitSize(); 9 //get the max size of pool 10 int getMaxSize(); 11 //get the core size of pool 12 int getCoreSize(); 13 //get the cache tasks queue of pool 14 int getQueueSize(); 15 //get the active thread volume of pool 16 int getActiveCount(); 17 //check if pool has been shutdown 18 boolean isShutdown(); 19 }
RunnableQueue介面
這個介面的作用與BlockingQueue介面一樣,用於儲存提交的Runnable實現類任務。
1 public interface RunnableQueue { 2 //提交任務到佇列 3 void offer(Runnable runnable); 4 //從佇列中獲取任務 5 Runnable take() throws InterruptedException; 6 //返回佇列中任務數 7 int size(); 8 }
ThreadFactory介面
定義了個性化建立執行緒的工廠方法
1 @FunctionalInterface 2 public interface ThreadFactory { 3 4 Thread createThread(Runnable runnable); 5 6 }
DenyPolicy介面
定義了執行緒池的拒絕策略,即當任務佇列達到上限時,採取何種措施拒絕。注意介面內定義了內部類作為外圍介面的實現類(該類自動為public和static,像這種巢狀類的實現,可查詢《Java程式設計思想》)。
1 @FunctionalInterface 2 public interface DenyPolicy { 3 4 void reject(Runnable runnable, ThreadPool threadPool); 5 //定義巢狀類作為拒絕策略的實現類 6 //1.拒絕並丟棄任務 7 class DiscardDenyPolicy implements DenyPolicy{ 8 9 @Override 10 public void reject(Runnable runnable, ThreadPool threadPool) { 11 12 } 13 } 14 15 //2.拒絕並丟擲自定義異常 16 class AbortDenyPolicy implements DenyPolicy{ 17 18 @Override 19 public void reject(Runnable runnable, ThreadPool threadPool) { 20 throw new RunnableDenyException("The runnable " + runnable + " will abort."); 21 } 22 } 23 24 //3.拒絕, 使用提交者所線上程來完成執行緒任務. 25 class RunnerDenyPolicy implements DenyPolicy{ 26 27 @Override 28 public void reject(Runnable runnable, ThreadPool threadPool) { 29 30 if(!threadPool.isShutdown()) { 31 runnable.run(); 32 } 33 } 34 } 35 }
其實實現了自定義異常類RunnableDenyException:
1 public class RunnableDenyException extends RuntimeException { 2 3 private static final long serialVersionUID = 112311231231412L; 4 5 public RunnableDenyException(String message) { 6 super(message); 7 } 8 }
InternalTask實現類
Runnable的實現類,會使用到RunnableQueue,它的作用其實是封裝了一個任務例項,把Runnable任務的run方法封裝到自己的Run方法實現中,並且提供了一個stop方法,用於線上程池銷燬或數量維護時停止當前執行緒。
1 public class InternalTask implements Runnable { 2 //組合一個RunnableQueue的引用 3 private final RunnableQueue runnableQueue; 4 //使用volatile關鍵字修飾開關變數 5 private volatile boolean running = true; 6 7 public InternalTask(RunnableQueue runnableQueue) { 8 this.runnableQueue = runnableQueue; 9 } 10 @Override 11 public void run() { 12 // if current task match "both running and isInterrupt" are true 13 // continue to take runnable from queue and run 14 while(running && !Thread.currentThread().isInterrupted()) { 15 try { 16 Runnable task = runnableQueue.take(); 17 task.run(); 18 } catch (Exception e) { 19 running = false; 20 break; 21 } 22 } 23 24 } 25 //停止執行緒的開關方法 26 public void stop() { 27 this.running = false; 28 } 29 }
到這裡,一個基本執行緒池的骨架就搭好了,接下來主要是實現各介面,實現具體的方法。
1. 佇列的實現類LinkedRunnableQueue
1 public class LinkedRunnableQueue implements RunnableQueue { 2 //設定佇列上限 3 private final int limit; 4 //設定拒絕策略的引用 5 private final DenyPolicy denyPolicy; 6 //使用LinkedList作為佇列的具體實現類 7 private final LinkedList<Runnable> runnableList = new LinkedList<>(); 8 //設定執行緒池的引用 9 private final ThreadPool threadPool; 10 //構造方法時賦初始值 11 public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) { 12 this.limit = limit; 13 this.denyPolicy = denyPolicy; 14 this.threadPool = threadPool; 15 } 16 17 @Override 18 public void offer(Runnable runnable) { 19 //使用同步鎖, 確保入隊的執行緒安全 20 synchronized (runnableList) { 21 //當達到佇列上限, 呼叫拒絕策略;否則加入隊尾, 並喚醒阻塞中的執行緒. 22 if(runnableList.size() >= limit) { 23 denyPolicy.reject(runnable, threadPool); 24 }else { 25 runnableList.addLast(runnable); 26 runnableList.notifyAll(); 27 } 28 } 29 } 30 31 @Override 32 public Runnable take() throws InterruptedException { 33 synchronized (runnableList) { 34 35 while(runnableList.isEmpty()) { 36 try { 37 //如果佇列中沒有可執行任務, 執行緒掛起, 進入runnableList關聯的monitor waitset中等待被喚醒 38 runnableList.wait(); 39 } catch (InterruptedException e) { 40 //如果被中斷, 需要丟擲異常 41 throw e; 42 } 43 } 44 return runnableList.removeFirst(); 45 } 46 } 47 48 @Override 49 public int size() { 50 synchronized (runnableList) { 51 //返回佇列中的任務數量 52 return runnableList.size(); 53 } 54 } 55 }
2. 執行緒工廠的實現
1 public class DefaultThreadFactory implements ThreadFactory { 2 //定義原子類的Integer作為執行緒組的計數 3 private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1); 4 //定義執行緒組物件 5 private static final ThreadGroup group = new ThreadGroup("MyThreadPool-"+ GROUP_COUNTER.getAndDecrement()); 6 //定義生產的執行緒計數 7 private static final AtomicInteger COUNTER = new AtomicInteger(0); 8 9 @Override 10 public Thread createThread(Runnable runnable) { 11 return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement()); 12 } 13 }
3. 執行緒池的實現
執行緒池的實現相對比較複雜, 運用了多種設計模式的思想,核心的要點包括:
1. 使用私有內部類的方式來複用Thread類,防止向外暴露Thread的介面;
2. 核心組成部分主要是LinkedList實現的任務佇列和ArrayDeque實現的工作執行緒佇列,構成了主要的儲存主體。
3. 核心的擴容機制需要RunnableQueue + InternalTask + ThreadFactory的結合, 簡單說來就是通過判定任務數是否達到閾值,然後增加工作執行緒的數量。
1 public class BasicThreadPool implements ThreadPool { 2 //為了不暴露Thread類的方法, 使用私有內部類WorkThread來繼承Thread類 3 private WorkThread workThread; 4 //執行緒池的基本屬性 5 private final int initSize; 6 private final int maxSize; 7 private final int coreSize; 8 private int activeCount; 9 //執行緒工廠引用 10 private final ThreadFactory threadFactory; 11 //佇列引用 12 private final RunnableQueue runnableQueue; 13 //執行緒池銷燬標識 14 private volatile boolean isShutdown = false; 15 //工作執行緒的佇列, 使用ArrayDeque實現 16 private final Queue<ThreadTask> threadQueue = new ArrayDeque<>(); 17 //定義了一個預設的拒絕策略 18 private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy(); 19 //定義了一個預設的工廠物件 20 private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory(); 21 22 private final long keepAliveTime; 23 private final TimeUnit timeUnit; 24 //預設的構造器, 只需要傳入初始容量, 最大容量, 核心容量和佇列上限 25 public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) { 26 this(initSize, maxSize, coreSize, queueSize, DEFAULT_THREAD_FACTORY, 27 DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS); 28 } 29 //完整構造器 30 public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize, ThreadFactory threadFactory, 31 DenyPolicy denyPolicy,long keepAliveTime, TimeUnit timeUnit) { 32 this.workThread = new WorkThread(); 33 this.initSize = initSize; 34 this.maxSize = maxSize; 35 this.coreSize = coreSize; 36 this.threadFactory = threadFactory; 37 this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this); 38 this.keepAliveTime = keepAliveTime; 39 this.timeUnit = timeUnit; 40 this.init(); 41 } 42 //執行緒池的初始化方法, 在構造器中被呼叫, 用於啟動工作執行緒 43 private void init() { 44 workThread.start(); 45 for(int i = 0; i < initSize; i++) { 46 newThread(); 47 } 48 } 49 //封裝了工作執行緒的啟動方法: 50 //1. 使用InternalTask封裝RunnableQueue物件 51 //2. 通過工廠方法制造工作執行緒並啟動 52 //3. 工作執行緒入隊, 工作執行緒佇列計數器+1 53 private void newThread() { 54 InternalTask internalTask = new InternalTask(runnableQueue); 55 Thread thread = this.threadFactory.createThread(internalTask); 56 ThreadTask threadTask = new ThreadTask(thread, internalTask); 57 threadQueue.offer(threadTask); 58 this.activeCount++; 59 thread.start(); 60 } 61 //工作執行緒出隊的方法 62 private void removeThread() { 63 ThreadTask threadTask = threadQueue.remove(); 64 threadTask.internalTask.stop(); 65 this.activeCount--; 66 } 67 //核心:通過內部類繼承Thread方法, 設計了自動擴容的機制. 68 //為了防止過快增加到Max容量, 使用continue來退出迴圈 69 private class WorkThread extends Thread{ 70 @Override 71 public void run() { 72 while(!isShutdown && !isInterrupted()) { 73 try { 74 timeUnit.sleep(keepAliveTime); 75 } catch (InterruptedException e) { 76 isShutdown = true; 77 break; 78 } 79 synchronized (this) { 80 if(isShutdown) { 81 break; 82 } 83 if(runnableQueue.size() > 0 && activeCount < coreSize) { 84 for(int i = initSize; i<coreSize;i++) { 85 newThread(); 86 } 87 continue; 88 } 89 if(runnableQueue.size() > 0 && activeCount < maxSize) { 90 for(int i = coreSize; i<maxSize;i++) { 91 newThread(); 92 } 93 } 94 if(runnableQueue.size()==0 && activeCount > coreSize) { 95 for(int i = coreSize; i < activeCount; i++) { 96 removeThread(); 97 } 98 } 99 100 } 101 } 102 } 103 } 104 105 @Override 106 public void execute(Runnable runnable) { 107 //如果執行緒池已經銷燬, 將丟擲異常 108 if(this.isShutdown) { 109 throw new IllegalStateException("the thread pool is destoried"); 110 } 111 this.runnableQueue.offer(runnable); 112 } 113 114 @Override 115 public void shutdown() { 116 synchronized(this) { 117 //防止重複銷燬 118 if(isShutdown) { 119 return; 120 } 121 //重置關閉標識 122 isShutdown = true; 123 //關閉任務工作執行緒 124 threadQueue.forEach(threadTask -> { 125 threadTask.internalTask.stop(); 126 threadTask.thread.interrupt(); 127 }); 128 //關閉執行緒池的工作執行緒 129 this.workThread.interrupt(); 130 } 131 } 132 133 @Override 134 public int getInitSize() { 135 if(isShutdown) { 136 throw new IllegalStateException("The thread pool is destroy"); 137 } 138 return this.initSize; 139 } 140 141 @Override 142 public int getMaxSize() { 143 if(isShutdown) { 144 throw new IllegalStateException("The thread pool is destroy"); 145 } 146 return this.maxSize; 147 } 148 149 @Override 150 public int getCoreSize() { 151 if(isShutdown) { 152 throw new IllegalStateException("The thread pool is destroy"); 153 } 154 return this.coreSize; 155 } 156 157 @Override 158 public int getQueueSize() { 159 if(isShutdown) { 160 throw new IllegalStateException("The thread pool is destroy"); 161 } 162 return runnableQueue.size(); 163 } 164 165 @Override 166 public int getActiveCount() { 167 synchronized(this) { 168 return this.activeCount; 169 } 170 } 171 172 @Override 173 public boolean isShutdown() { 174 return this.isShutdown; 175 } 176 }
執行緒池的測試
編寫一個簡單的測試類,同時啟動20個任務,測試執行緒池的活動狀態:
1 public class ThreadPoolTest { 2 3 public static void main(String[] args) throws InterruptedException { 4 5 final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000); 6 7 for(int i = 0; i < 20; i++) { 8 threadPool.execute(() -> { 9 try { 10 TimeUnit.SECONDS.sleep(10); 11 System.out.println(Thread.currentThread().getName() + "is Running and done"); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 }); 16 } 17 while(true) { 18 System.out.println("getActiveCount: " + threadPool.getActiveCount()); 19 System.out.println("getQueueSize: " + threadPool.getQueueSize()); 20 System.out.println("getCoreSize: " + threadPool.getCoreSize()); 21 System.out.println("getMaxSize: " + threadPool.getMaxSize()); 22 System.out.println("================================================"); 23 TimeUnit.SECONDS.sleep(5); 24 } 25 } 26 }
輸出結果如下
thread-pool--1is Running and done thread-pool-0is Running and done getActiveCount: 4 getQueueSize: 14 getCoreSize: 4 getMaxSize: 6 ================================================ getActiveCount: 4 getQueueSize: 14 getCoreSize: 4 getMaxSize: 6 ================================================ thread-pool--3is Running and done thread-pool--2is Running and done thread-pool--1is Running and done thread-pool-0is Running and done getActiveCount: 6 getQueueSize: 8 getCoreSize: 4 getMaxSize: 6