Java執行緒池ThreadPoolExecutor使用和分析(二)
相關文章目錄:
execute()是 java.util.concurrent.Executor介面中唯一的方法,JDK註釋中的描述是“在未來的某一時刻執行命令command”,即向執行緒池中提交任務,在未來某個時刻執行,提交的任務必須實現Runnable介面,該提交方式不能獲取返回值。下面是對execute()方法內部原理的分析,分析前先簡單介紹執行緒池有哪些狀態,在一系列執行過程中涉及執行緒池狀態相關的判斷。以下分析基於JDK 1.7
以下是本文的目錄大綱:
若有不正之處請多多諒解,歡迎批評指正、互相討論。
請尊重作者勞動成果,轉載請標明原文連結:
http://www.cnblogs.com/trust-freedom/p/6681948.html
一、執行緒池的執行流程
1、如果執行緒池中的執行緒數量少於corePoolSize,就建立新的執行緒來執行新新增的任務 2、如果執行緒池中的執行緒數量大於等於corePoolSize,但佇列workQueue未滿,則將新新增的任務放到workQueue中 3、如果執行緒池中的執行緒數量大於等於corePoolSize,且佇列workQueue已滿,但執行緒池中的執行緒數量小於maximumPoolSize,則會建立新的執行緒來處理被新增的任務 4、如果執行緒池中的執行緒數量等於了maximumPoolSize,就用RejectedExecutionHandler來執行拒絕策略
二、執行緒池狀態
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
1 ;
|
其中ctl這個AtomicInteger的功能很強大,其高3位用於維護執行緒池執行狀態,低29位維護執行緒池中執行緒數量
1、RUNNING:-1<<COUNT_BITS,即高3位為1,低29位為0,該狀態的執行緒池會接收新任務,也會處理在阻塞佇列中等待處理的任務
2、SHUTDOWN:0<<COUNT_BITS,即高3位為0,低29位為0,該狀態的執行緒池不會再接收新任務,但還會處理已經提交到阻塞佇列中等待處理的任務
3、STOP:1<<COUNT_BITS,即高3位為001,低29位為0,該狀態的執行緒池不會再接收新任務,不會處理在阻塞佇列中等待的任務,而且還會中斷正在執行的任務
4、TIDYING:2<<COUNT_BITS,即高3位為010,低29位為0,所有任務都被終止了,workerCount為0,為此狀態時還將呼叫terminated()方法
5、TERMINATED:3<<COUNT_BITS,即高3位為100,低29位為0,terminated()方法呼叫完成後變成此狀態
這些狀態均由int型表示,大小關係為 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,這個順序基本上也是遵循執行緒池從 執行 到 終止這個過程。
runStateOf(int c) 方法:c & 高3位為1,低29位為0的~CAPACITY,用於獲取高3位儲存的執行緒池狀態
workerCountOf(int c)方法:c & 高3位為0,低29位為1的CAPACITY,用於獲取低29位的執行緒數量
ctlOf(int rs, int wc)方法:引數rs表示runState,引數wc表示workerCount,即根據runState和workerCount打包合併成ctl
三、任務提交內部原理
1、execute() -- 提交任務
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
|
execute(Runnable command)
引數: command 提交執行的任務,不能為空執行流程: 1、如果執行緒池當前執行緒數量少於corePoolSize,則addWorker(command, true)建立新worker執行緒,如建立成功返回,如沒建立成功,則執行後續步驟; addWorker(command, true)失敗的原因可能是: A、執行緒池已經shutdown,shutdown的執行緒池不再接收新任務 B、workerCountOf(c) < corePoolSize 判斷後,由於併發,別的執行緒先建立了worker執行緒,導致workerCount>=corePoolSize 2、如果執行緒池還在running狀態,將task加入workQueue阻塞佇列中,如果加入成功,進行double-check,如果加入失敗(可能是佇列已滿),則執行後續步驟; double-check主要目的是判斷剛加入workQueue阻塞佇列的task是否能被執行 A、如果執行緒池已經不是running狀態了,應該拒絕新增新任務,從workQueue中刪除任務 B、如果執行緒池是執行狀態,或者從workQueue中刪除任務失敗(剛好有一個執行緒執行完畢,並消耗了這個任務),確保還有執行緒執行任務(只要有一個就夠了) 3、如果執行緒池不是running狀態 或者 無法入佇列,嘗試開啟新執行緒,擴容至maxPoolSize,如果addWork(command, false)失敗了,拒絕當前command
2、addWorker() -- 新增worker執行緒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
|
addWorker(Runnable firstTask, boolean core)引數: firstTask: worker執行緒的初始任務,可以為空 core: true:將corePoolSize作為上限,false:將maximumPoolSize作為上限addWorker方法有4種傳參的方式:
1、addWorker(command, true)
2、addWorker(command, false)
3、addWorker(null, false)
4、addWorker(null, true)
在execute方法中就使用了前3種,結合這個核心方法進行以下分析 第一個:執行緒數小於corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就返回false 第二個:當佇列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果執行緒池也滿了的話就返回false 第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker線上程執行的時候會去任務佇列裡拿任務,這樣就相當於建立了一個新的執行緒,只是沒有馬上分配任務 第四個:這個方法就是放一個null的task進Workers Set,而且是在小於corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就返回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為執行緒池預先啟動corePoolSize個worker等待從workQueue中獲取任務執行執行流程: 1、判斷執行緒池當前是否為可以新增worker執行緒的狀態,可以則繼續下一步,不可以return false: A、執行緒池狀態>shutdown,可能為stop、tidying、terminated,不能新增worker執行緒 B、執行緒池狀態==shutdown,firstTask不為空,不能新增worker執行緒,因為shutdown狀態的執行緒池不接收新任務 C、執行緒池狀態==shutdown,firstTask==null,workQueue為空,不能新增worker執行緒,因為firstTask為空是為了新增一個沒有任務的執行緒再從workQueue獲取task,而workQueue為空,說明新增無任務執行緒已經沒有意義 2、執行緒池當前執行緒數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了return false,沒超過則對workerCount+1,繼續下一步 3、線上程池的ReentrantLock保證下,向Workers Set中新增新建立的worker例項,新增完成後解鎖,並啟動worker執行緒,如果這一切都成功了,return true,如果新增worker入Set失敗或啟動失敗,呼叫addWorkerFailed()邏輯
3、內部類Worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
|
Worker類 Worker類本身既實現了Runnable,又繼承了AbstractQueuedSynchronizer(以下簡稱AQS),所以其既是一個可執行的任務,又可以達到鎖的效果new Worker() 1、將AQS的state置為-1,在runWoker()前不允許中斷 2、待執行的任務會以引數傳入,並賦予firstTask 3、用Worker這個Runnable建立Thread
之所以Worker自己實現Runnable,並建立Thread,在firstTask外包一層,是因為要通過Worker控制中斷,而firstTask這個工作任務只是負責執行業務Worker控制中斷主要有以下幾方面: 1、初始AQS狀態為-1,此時不允許中斷interrupt(),只有在worker執行緒啟動了,執行了runWoker(),將state置為0,才能中斷 不允許中斷體現在: A、shutdown()執行緒池時,會對每個worker tryLock()上鎖,而Worker類這個AQS的tryAcquire()方法是固定將state從0->1,故初始狀態state==-1時tryLock()失敗,沒發interrupt() B、shutdownNow()執行緒池時,不用tryLock()上鎖,但呼叫worker.interruptIfStarted()終止worker,interruptIfStarted()也有state>0才能interrupt的邏輯 2、為了防止某種情況下,在執行中的worker被中斷,runWorker()每次執行任務時都會lock()上鎖,而shutdown()這類可能會終止worker的操作需要先獲取worker的鎖,這樣就防止了中斷正在執行的執行緒
Worker實現的AQS為不可重入鎖,為了是在獲得worker鎖的情況下再進入其它一些需要加鎖的方法
Worker和Task的區別: Worker是執行緒池中的執行緒,而Task雖然是runnable,但是並沒有真正執行,只是被Worker呼叫了run方法,後面會看到這部分的實現。
4、runWorker() -- 執行任務
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
|
runWorker(Worker w)執行流程: 1、Worker執行緒啟動後,通過Worker類的run()方法呼叫runWorker(this) 2、執行任務之前,首先worker.unlock(),將AQS的state置為0,允許中斷當前worker執行緒 3、開始執行firstTask,呼叫task.run(),在執行任務前會上鎖wroker.lock(),在執行完任務後會解鎖,為了防止在任務執行時被執行緒池一些中斷操作中斷 4、在任務執行前後,可以根據業務場景自定義beforeExecute() 和 afterExecute()方法 5、無論在beforeExecute()、task.run()、afterExecute()發生異常上拋,都會導致worker執行緒終止,進入processWorkerExit()處理worker退出的流程 6、如正常執行完當前task後,會通過getTask()從阻塞佇列中獲取新任務,當佇列中沒有任務,且獲取任務超時,那麼當前worker也會進入退出流程
5、getTask() -- 獲取任務
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
|