java執行緒池詳細入門教程即原始碼解析
##1、執行緒池概念
執行緒池是執行緒的集合,通過執行緒池我們不需要自己建立執行緒,將任務提交給執行緒池即可。為什麼要使用執行緒池,首先,使用執行緒池可以重複利用已有的執行緒繼續執行任務,避免執行緒在建立和銷燬時造成的消耗。其次,由於沒有執行緒建立和銷燬時的消耗,可以提高系統響應速度。最後,通過執行緒可以對執行緒進行合理的管理,根據系統的承受能力調整可執行執行緒數量的大小等。
##2、執行緒池使用示例
這裡做兩個測試,首先寫一個實現Runnable介面的類:
class MyTask implements Runnable{ @Override public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName()); } }
main中程式碼:
ExecutorService pool = Executors.newFixedThreadPool(5); Runnable myTask = new MyTask(); Runnable myTask1 = new MyTask(); Runnable myTask2 = new MyTask(); Runnable myTask3 = new MyTask(); Runnable myTask4 = new MyTask(); pool.execute(myTask); pool.execute(myTask1); pool.execute(myTask2); pool.execute(myTask3); pool.execute(myTask4);
這裡newFixedThreadPool中的5代表可執行執行緒數量。所以結果:
pool-1-thread-3
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-2
可以看到這裡又五個執行緒在執行。這裡如果將可執行執行緒數量變成2,那麼結果如下:
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
可以看到執行緒1和執行緒2都被重複利用了,沒有建立更多的執行緒出來。
##3、執行緒池的狀態
執行緒池的狀態及狀態間的切換如下圖所示:
可以看到執行緒池有五種狀態RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED,不同於執行緒的轉檯切換的是,執行緒池的狀態可操作的其實只有兩種狀態,RUNNING和TERMINATED,執行緒池一旦建立就會處於RUNNING狀態,而TERMINATED是終止,執行緒池呼叫shutdown或者shutdownNow執行緒池其實就會慢慢的切換到終止狀態,中間三種狀態只是執行緒由執行態過渡到終止態的中間狀態,用來進行一些處理事項的。RUNNING狀態是正常執行狀態,能夠接受新的任務和處理任務,SHUTDOWN是關閉狀態,不能接受新的任務,將正在執行的任務處理完畢後進入到TIDYING整理狀態,STOP是停止狀態,不接受任務,也不處理任務,直接中斷任務,然後進入到TIDYING整理狀態,TIDYING狀態會自動呼叫terminate方法,呼叫完後進入到終止狀態。
##4、通過原始碼理解執行緒池
首先是執行緒的建立,執行緒的建立一般通過一個工廠類建立,
ExecutorService pool = Executors.newFixedThreadPool(2);
進去看這個函式:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到還是直接new了一個執行緒池出來,再看這個執行緒池的構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
首先對這幾個引數進行說明:
corePoolSize:核心池大小,表示執行緒池正常情況下能執行的執行緒最大多少。
maximumPoolSize:最大池大小,表示執行緒池最大能執行的執行緒數。
keepAliveTime:表示執行緒執行任務後的空閒存活時間,過了這個時間執行緒將被銷燬,這個引數的存在也直接實現了概述中所說的前兩點優點,執行緒的消耗可以分為A,B,C三個階段,A代表建立,B代表執行,C代表消耗,如果有很多個任務需要執行時,B執行所帶來的消耗代價小於A,C所帶來的消耗代價,那麼就通過這個引數,讓執行緒保持一段存活時間,方便執行後面的任務。
unit:執行緒空閒存活時間的單位。
workQueue:這個是任務佇列,將後面的任務加入到這個佇列中。
threadFactory:真正用於建立執行緒的引數。
handler:表示已經不能在接收任務時,呼叫的處理類。
在這個構造方法中分別對這幾個引數進行了初始化。**這裡再介紹下這幾個引數的關係,當用戶建立任務時,如果當前的執行的執行緒數小於核心池數量,那麼會建立新的執行緒來執行這個任務,如果當前執行的執行緒數大於核心池的數量,那麼就將任務新增到任務佇列中,如果任務佇列已經裝滿且當前執行執行緒數小於最大池的數量,那麼就再建立執行緒來執行這個任務,如果已經等於了最大池的數量,那麼將拒絕這個任務並且執行handle處理類。**執行緒的建立介紹了再來看任務的執行方法execute:
public void execute(Runnable command) {
//當任務為空,丟擲空指標異常
if (command == null)
throw new NullPointerException();
//這個是獲取到ctl引數的值,這個值是32位的
//高三位儲存了執行緒池的狀態,其餘位儲存了執行緒池中執行的執行緒的數量
int c = ctl.get();
//當執行緒執行數量小於核心池時
if (workerCountOf(c) < corePoolSize) {
//建立新的執行緒來執行這個任務
if (addWorker(command, true))
//然後直接返回
return;
c = ctl.get();
}
//如果當前執行執行緒已經超過了核心池大小,並且執行緒池是執行的
//將任務加入到任務佇列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果執行緒池沒有運行了,將任務移除
if (! isRunning(recheck) && remove(command))
//呼叫方法處理
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果新增佇列失敗了,那麼再建立一個新的執行緒
else if (!addWorker(command, false))
//如果失敗了,也就是最大池滿了,那麼呼叫拒絕的處理方法。
reject(command);
}
這裡多次呼叫到addWorker方法,這裡繼續看addWorker方法:
private boolean addWorker(Runnable firstTask, boolean core) {
//前面這一片都是驗證性的工作,可以不重點看
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//從這裡開始重點看
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//獲取可重入互斥鎖
final ReentrantLock mainLock = this.mainLock;
//建立一個新的執行緒來之心這個任務
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//執行緒不為空,加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將worker新增到set集合中
workers.add(w);
//獲取集合的worker數量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//worker新增成功執行任務
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
到這裡執行緒池的入門介紹結束。