1. 程式人生 > >深入理解Java執行緒池(1):ThreadPoolExecutor整體流程梳理,建立worker相關方法

深入理解Java執行緒池(1):ThreadPoolExecutor整體流程梳理,建立worker相關方法

執行緒池作為一個執行緒的容器,主要的作用就是防止頻繁建立執行緒,節省時間資源和cpu資源。雖然一定程度上佔用了記憶體,但實際情況下利遠遠大於弊。

構造方法

public ThreadPoolExecutor(
	int corePoolSize,					//核心執行緒數量
	int maximumPoolSize,				//最大執行緒數量
	long keepAliveTime,					//最大存活時間
	TimeUnit unit,						//時間單位
	BlockingQueue<Runnable> workQueue,	//執行緒佇列
	ThreadFactory threadFactory,
//執行緒工廠 RejectedExecutionHandler handler) //超過最大執行緒後,處理方式

上邊的是最核心的構造方法,執行緒池的運作方式其實從構造方法就可以略知一二。我先簡述一下執行緒池的運作方法,以及設定的引數的作用。

  • 建立執行緒池後,其實執行緒池中執行緒數量為0;
  • 當有一個任務被提交後,會建立一個worker執行緒,worker會被放入執行緒池中並開始執行這個任務,任務完成後會等待我們設定的任務佇列(workQueue)中的任務。
  • 這樣不停的提交任務,worker的數量也會不斷增多,直到增大到核心執行緒數量(corePoolSize),此時不會再增加執行緒池中的worker執行緒,多出的任務會放入佇列中,等到核心執行緒池中的worker空閒下來再執行放入佇列中的任務。如果佇列是無限佇列可能會出現建立過多執行緒撐爆記憶體的現象。
  • 當佇列滿員後,會繼續往執行緒池中增加worker,直到達到執行緒池最大執行緒數量(maximumPoolSize)。此時,會根據設定的RejectedExecutionHandler實現類(handler)執行不同的拒絕策略,例如:丟擲異常,或者使用當前執行緒執行。若最大執行緒數量過大同樣會出現worker數量過多撐爆記憶體的現象。
  • 當執行緒池的worker數量超過核心執行緒數量時,有worker的任務執行完畢後獲取佇列中的任務超過指定時間(keepAliveTime和unit確定的時間),這個worker就會被消滅。
  • 執行緒工廠(threadFactory)就是建立執行緒的地方,可使用Guava的ThreadFactoryBuilder().setNameFormat(“demo-thread-%d”).build()建立threadFactory。也可以使用自帶的DefaultThreadFactory。

以上就是執行緒池執行的大概流程。佇列數量過大或者最大執行緒數量過大,會讓伺服器掛掉,由於一個服務幹爆整個伺服器實屬不智,應當謹慎使用Executors中的建立方式。

全域性變數

下邊先看看ThreadPoolExecutor中的全域性變數,構造方法中涉及的就不在贅述

//使用了執行緒安全的AtomicInteger類,保證了ctl的不會因多執行緒而出現問題
//ctl變數值的前 29 位表示工作執行緒數量 workerCount, 剩餘高位來表示執行緒池狀態runState。
//初始化時預設是running狀態,worker數量為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//running狀態;接受新任務,並處理佇列任務,是正常執行狀態
private static final int RUNNING    = -1 << COUNT_BITS;	
//shutdown狀態;不接受新任務,但會處理佇列任務
private static final int SHUTDOWN   =  0 << COUNT_BITS;	
//stop狀態;不接受新任務,不會處理佇列任務,而且會中斷正在處理過程中的任務
private static final int STOP       =  1 << COUNT_BITS;	
//tidying狀態;所有的任務已結束,workerCount為0,執行緒過渡到TIDYING狀態,將會執行terminated()鉤子方法(此方法為空)
private static final int TIDYING    =  2 << COUNT_BITS;	
//terminated狀態;terminated()方法已經完成後變更成此狀態。
private static final int TERMINATED =  3 << COUNT_BITS;	

//儲存worker的容器,不是執行緒安全,所以其操作通常需要加鎖
private final HashSet<Worker> workers = new HashSet<Worker>();
//全域性鎖,會保證關鍵操作不會因併發而混亂,如對workers的操作。
private final ReentrantLock mainLock = new ReentrantLock();		

其他全域性變數可能並非關鍵變數,如果遇到會在程式碼裡說明。 下面開始對原始碼進行分析

execute方法

這是執行緒池的最核心方法,加入Runnable任務,使用執行緒池中的worker執行。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
	//計算 workerCount 和 runState 時通過掩碼計算。
    int c = ctl.get();
    //若worker數量小於核心執行緒池中執行緒數,增加worker,若成功則直接返回。失敗繼續執行
    if (workerCountOf(c) < corePoolSize) {				
        if (addWorker(command, true))					
            return;
        c = ctl.get();									
    }
    //判斷若是running狀態,嘗試向佇列中新增任務。執行到此處,核心執行緒池已滿,嘗試向佇列中新增任務
    if (isRunning(c) && workQueue.offer(command)) {	
        int recheck = ctl.get();
        //由於沒有加鎖,需要重新判斷現在是否處於執行狀態,若不是需要移除佇列中任務,移除後執行插入失敗操作reject
        if (! isRunning(recheck) && remove(command))	
            reject(command);
		//如果正常需要檢視工作的worker數量,若為0,則加入一個空閒Worker。
        else if (workerCountOf(recheck) == 0)			
            addWorker(null, false);
    }
	//執行到此處說明,核心執行緒已經飽和,阻塞佇列已經滿員,嘗試再次增加worker環節佇列壓力
    else if (!addWorker(command, false))reject(command);
}

增加空任務worker的邏輯比較奇怪,可能是狀態突然變更為shutdown,佇列中的任務已經被加入,shutdown操作意外殺死了所有worker執行緒,不得不執行這個操作。某種程度上保證了不會出現插入任務卻沒有worker的情況。根據addWorker方法中判斷是否新增worker的邏輯推斷得出,尚未完全明白為何會發生這種情況。

addWorker

addWorker方法會嘗試向執行緒池中新增執行緒。 呼叫此方法的方法往往不會對執行緒池狀態做出詳細判斷,所以addWorker方法需要對當前狀態做出明確判斷,根據不同狀態執行不同操作。 入參firstTask表示worker的第一個任務,可能為null,表示只增加worker,不設定初始任務,worker執行的任務從佇列中獲取;入參core表示是否加入核心執行緒池

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
/* 判斷何時不新增worker直接返回。
 * 若執行緒池狀態非running時,一下三個條件都滿足才繼續執行後邊的程式碼,否則其他非running狀態都是值節返回失敗
 * 1 執行緒池是shutdown狀態 2 建立的worker的任務為空 3 任務佇列中有任務未完成
 * 由此可以推斷出execute加入無任務worker的原因。
 */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
//判斷執行緒池數量,大於執行緒池能儲存最大數量,直接返回
//判斷入參core,如果true執行緒池數量小於核心執行緒數,如果false小於設定最大執行緒數
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
//cas增加worker數量,成功跳出,跳過所有迴圈;cas保證了併發增加worker不會發生異常
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get(); 
//執行到此處說明ctl改變,若是state改變則跳出到大迴圈,從新判斷;若是worker數量改變則在小迴圈中繼續執行
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
//worker是否啟動
    boolean workerStarted = false;
//woker是否被新增入worker容器
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
//加上鎖機制保證不會發生衝突
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
//最後的判斷,和mainLock鎖一起保證不會發生併發衝突
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
//在這裡開始真正增加worker。
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
//啟動worker執行緒,worker的run方法會呼叫runWorker方法,不停的領取任務
                t.start();					
                workerStarted = true;
            }
        }
    } finally {
//worker啟動失敗,此時呼叫addWorkerFailed方法。注意,此時ctl中worker數量已經+1,但worker容器中不一定新增。
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorkerFailed

這個方法被呼叫時,worker因不明原因啟動或新增失敗。執行緒池總worker數量加一了。所以需要嘗試把worker容器中的啟動失敗的worker剔除,worker數量減一。

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();		
        tryTerminate();			//如有需要嘗試關閉執行緒池
    } finally {
        mainLock.unlock();
    }
}