1. 程式人生 > >Java執行緒池ThreadPoolExecutor使用和分析(二)

Java執行緒池ThreadPoolExecutor使用和分析(二)

   相關文章目錄:

    execute()是 java.util.concurrent.Executor介面中唯一的方法,JDK註釋中的描述是“在未來的某一時刻執行命令command”,即向執行緒池中提交任務,在未來某個時刻執行,提交的任務必須實現Runnable介面,該提交方式不能獲取返回值。下面是對execute()方法內部原理的分析,分析前先簡單介紹執行緒池有哪些狀態,在一系列執行過程中涉及執行緒池狀態相關的判斷。以下分析基於JDK 1.7

    以下是本文的目錄大綱:

    若有不正之處請多多諒解,歡迎批評指正、互相討論。

    請尊重作者勞動成果,轉載請標明原文連結:

    http://www.cnblogs.com/trust-freedom/p/6681948.html

一、執行緒池的執行流程

1、如果執行緒池中的執行緒數量少於corePoolSize,就建立新的執行緒來執行新新增的任務 2、如果執行緒池中的執行緒數量大於等於corePoolSize,但佇列workQueue未滿,則將新新增的任務放到workQueue中 3、如果執行緒池中的執行緒數量大於等於corePoolSize,且佇列workQueue已滿,但執行緒池中的執行緒數量小於maximumPoolSize,則會建立新的執行緒來處理被新增的任務 4、如果執行緒池中的執行緒數量等於了maximumPoolSize,就用RejectedExecutionHandler來執行拒絕策略

二、執行緒池狀態

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY   = (1 << COUNT_BITS) -

1;

// runState is stored in the high-order bits

private static final int RUNNING    = -1 << COUNT_BITS;

private static final int SHUTDOWN   =  0 << COUNT_BITS;

private static final int STOP       =  1 << COUNT_BITS;

private static final int TIDYING    =  2 << COUNT_BITS;

private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl

private static int runStateOf(int c)     { return c & ~CAPACITY; }

private static int workerCountOf(int c)  { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

其中ctl這個AtomicInteger的功能很強大,其高3位用於維護執行緒池執行狀態,低29位維護執行緒池中執行緒數量

1、RUNNING:-1<<COUNT_BITS,即高3位為1,低29位為0,該狀態的執行緒池會接收新任務,也會處理在阻塞佇列中等待處理的任務

2、SHUTDOWN:0<<COUNT_BITS,即高3位為0,低29位為0,該狀態的執行緒池不會再接收新任務,但還會處理已經提交到阻塞佇列中等待處理的任務

3、STOP:1<<COUNT_BITS,即高3位為001,低29位為0,該狀態的執行緒池不會再接收新任務,不會處理在阻塞佇列中等待的任務,而且還會中斷正在執行的任務

4、TIDYING:2<<COUNT_BITS,即高3位為010,低29位為0,所有任務都被終止了,workerCount為0,為此狀態時還將呼叫terminated()方法

5、TERMINATED:3<<COUNT_BITS,即高3位為100,低29位為0,terminated()方法呼叫完成後變成此狀態

這些狀態均由int型表示,大小關係為 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,這個順序基本上也是遵循執行緒池從 執行 到 終止這個過程。

runStateOf(int c)  方法:c & 高3位為1,低29位為0的~CAPACITY,用於獲取高3位儲存的執行緒池狀態

workerCountOf(int c)方法:c & 高3位為0,低29位為1的CAPACITY,用於獲取低29位的執行緒數量

ctlOf(int rs, int wc)方法:引數rs表示runState,引數wc表示workerCount,即根據runState和workerCount打包合併成ctl

三、任務提交內部原理

1、execute()  --  提交任務

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

/**

* Executes the given task sometime in the future.  The task

* may execute in a new thread or in an existing pooled thread.

* 在未來的某個時刻執行給定的任務。這個任務用一個新執行緒執行,或者用一個執行緒池中已經存在的執行緒執行

*

* If the task cannot be submitted for execution, either because this

* executor has been shutdown or because its capacity has been reached,

* the task is handled by the current {@code RejectedExecutionHandler}.

* 如果任務無法被提交執行,要麼是因為這個Executor已經被shutdown關閉,要麼是已經達到其容量上限,任務會被當前的RejectedExecutionHandler處理

*

* @param command the task to execute

* @throws RejectedExecutionException at discretion of

*         {@code RejectedExecutionHandler}, if the task

*         cannot be accepted for execution                 RejectedExecutionException是一個RuntimeException

* @throws NullPointerException if {@code command} is null

*/

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

/*

* Proceed in 3 steps:

*

* 1. If fewer than corePoolSize threads are running, try to

* start a new thread with the given command as its first

* task.  The call to addWorker atomically checks runState and

* workerCount, and so prevents false alarms that would add

* threads when it shouldn't, by returning false.

* 如果執行的執行緒少於corePoolSize,嘗試開啟一個新執行緒去執行command,command作為這個執行緒的第一個任務

*

* 2. If a task can be successfully queued, then we still need

* to double-check whether we should have added a thread

* (because existing ones died since last checking) or that

* the pool shut down since entry into this method. So we

* recheck state and if necessary roll back the enqueuing if

* stopped, or start a new thread if there are none.

* 如果任務成功放入佇列,我們仍需要一個雙重校驗去確認是否應該新建一個執行緒(因為可能存在有些執行緒在我們上次檢查後死了) 或者 從我們進入這個方法後,pool被關閉了

* 所以我們需要再次檢查state,如果執行緒池停止了需要回滾入佇列,如果池中沒有執行緒了,新開啟 一個執行緒

*

* 3. If we cannot queue task, then we try to add a new

* thread.  If it fails, we know we are shut down or saturated

* and so reject the task.

* 如果無法將任務入佇列(可能佇列滿了),需要新開區一個執行緒(自己:往maxPoolSize發展)

* 如果失敗了,說明執行緒池shutdown 或者 飽和了,所以我們拒絕任務

*/

int c = ctl.get();

/**

* 1、如果當前執行緒數少於corePoolSize(可能是由於addWorker()操作已經包含對執行緒池狀態的判斷,如此處沒加,而入workQueue前加了)

*/

if (workerCountOf(c) < corePoolSize) {

//addWorker()成功,返回

if (addWorker(command, true))

return;

/**

* 沒有成功addWorker(),再次獲取c(凡是需要再次用ctl做判斷時,都會再次呼叫ctl.get())

* 失敗的原因可能是:

* 1、執行緒池已經shutdown,shutdown的執行緒池不再接收新任務

* 2、workerCountOf(c) < corePoolSize 判斷後,由於併發,別的執行緒先建立了worker執行緒,導致workerCount>=corePoolSize

*/

c = ctl.get();

}

/**

* 2、如果執行緒池RUNNING狀態,且入佇列成功

*/

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();//再次校驗位

/**

* 再次校驗放入workerQueue中的任務是否能被執行

* 1、如果執行緒池不是執行狀態了,應該拒絕新增新任務,從workQueue中刪除任務

* 2、如果執行緒池是執行狀態,或者從workQueue中刪除任務失敗(剛好有一個執行緒執行完畢,並消耗了這個任務),確保還有執行緒執行任務(只要有一個就夠了)

*/

//如果再次校驗過程中,執行緒池不是RUNNING狀態,並且remove(command)--workQueue.remove()成功,拒絕當前command

if (! isRunning(recheck) && remove(command))

reject(command);

//如果當前worker數量為0,通過addWorker(null, false)建立一個執行緒,其任務為null

//為什麼只檢查執行的worker數量是不是0呢?? 為什麼不和corePoolSize比較呢??

//只保證有一個worker執行緒可以從queue中獲取任務執行就行了??

//因為只要還有活動的worker執行緒,就可以消費workerQueue中的任務

else if (workerCountOf(recheck) == 0)

addWorker(null, false);  //第一個引數為null,說明只為新建一個worker執行緒,沒有指定firstTask

//第二個引數為true代表佔用corePoolSize,false佔用maxPoolSize

}

/**

* 3、如果執行緒池不是running狀態 或者 無法入佇列

*   嘗試開啟新執行緒,擴容至maxPoolSize,如果addWork(command, false)失敗了,拒絕當前command

*/

else if (!addWorker(command, false))

reject(command);

}

execute(Runnable command)

引數:     command    提交執行的任務,不能為空執行流程: 1、如果執行緒池當前執行緒數量少於corePoolSize,則addWorker(command, true)建立新worker執行緒,如建立成功返回,如沒建立成功,則執行後續步驟;     addWorker(command, true)失敗的原因可能是:     A、執行緒池已經shutdown,shutdown的執行緒池不再接收新任務     B、workerCountOf(c) < corePoolSize 判斷後,由於併發,別的執行緒先建立了worker執行緒,導致workerCount>=corePoolSize 2、如果執行緒池還在running狀態,將task加入workQueue阻塞佇列中,如果加入成功,進行double-check,如果加入失敗(可能是佇列已滿),則執行後續步驟;     double-check主要目的是判斷剛加入workQueue阻塞佇列的task是否能被執行     A、如果執行緒池已經不是running狀態了,應該拒絕新增新任務,從workQueue中刪除任務     B、如果執行緒池是執行狀態,或者從workQueue中刪除任務失敗(剛好有一個執行緒執行完畢,並消耗了這個任務),確保還有執行緒執行任務(只要有一個就夠了) 3、如果執行緒池不是running狀態 或者 無法入佇列,嘗試開啟新執行緒,擴容至maxPoolSize,如果addWork(command, false)失敗了,拒絕當前command

2、addWorker()  --  新增worker執行緒

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

/**

* Checks if a new worker can be added with respect to current

* pool state and the given bound (either core or maximum). If so,

* the worker count is adjusted accordingly, and, if possible, a

* new worker is created and started, running firstTask as its

* first task. This method returns false if the pool is stopped or

* eligible to shut down. It also returns false if the thread

* factory fails to create a thread when asked.  If the thread

* creation fails, either due to the thread factory returning

* null, or due to an exception (typically OutOfMemoryError in

* Thread#start), we roll back cleanly.

* 檢查根據當前執行緒池的狀態和給定的邊界(core or maximum)是否可以建立一個新的worker

* 如果是這樣的話,worker的數量做相應的調整,如果可能的話,建立一個新的worker並啟動,引數中的firstTask作為worker的第一個任務

* 如果方法返回false,可能因為pool已經關閉或者呼叫過了shutdown

* 如果執行緒工廠建立執行緒失敗,也會失敗,返回false

* 如果執行緒建立失敗,要麼是因為執行緒工廠返回null,要麼是發生了OutOfMemoryError

*

* @param firstTask the task the new thread should run first (or

* null if none). Workers are created with an initial first task

* (in method execute()) to bypass(繞開) queuing when there are fewer

* than corePoolSize threads (in which case we always start one),

* or when the queue is full (in which case we must bypass queue).

* Initially idle threads are usually created via

* prestartCoreThread or to replace other dying workers.

*

* @param core if true use corePoolSize as bound, else

* maximumPoolSize. (A boolean indicator is used here rather than a

* value to ensure reads of fresh values after checking other pool

* state).

* @return true if successful

*/

private boolean addWorker(Runnable firstTask, boolean core) {

//外層迴圈,負責判斷執行緒池狀態

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c); //狀態

// Check if queue empty only if necessary.

/**

* 執行緒池的state越小越是執行狀態,runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3

* 1、如果執行緒池state已經至少是shutdown狀態了

* 2、並且以下3個條件任意一個是false

*   rs == SHUTDOWN         (隱含:rs>=SHUTDOWN)false情況: 執行緒池狀態已經超過shutdown,可能是stop、tidying、terminated其中一個,即執行緒池已經終止

*   firstTask == null      (隱含:rs==SHUTDOWN)false情況: firstTask不為空,rs==SHUTDOWN 且 firstTask不為空,return false,場景是線上程池已經shutdown後,還要新增新的任務,拒絕

*   ! workQueue.isEmpty()  (隱含:rs==SHUTDOWN,firstTask==null)false情況: workQueue為空,當firstTask為空時是為了建立一個沒有任務的執行緒,再從workQueue中獲取任務,如果workQueue已經為空,那麼就沒有新增新worker執行緒的必要了

* return false,即無法addWorker()

*/

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

//內層迴圈,負責worker數量+1

for (;;) {

int wc = workerCountOf(c); //worker數量

//如果worker數量>執行緒池最大上限CAPACITY(即使用int低29位可以容納的最大值)

//或者( worker數量>corePoolSize 或  worker數量>maximumPoolSize ),即已經超過了給定的邊界

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

//呼叫unsafe CAS操作,使得worker數量+1,成功則跳出retry迴圈

if (compareAndIncrementWorkerCount(c))

break retry;

//CAS worker數量+1失敗,再次讀取ctl

c = ctl.get();  // Re-read ctl

//如果狀態不等於之前獲取的state,跳出內層迴圈,繼續去外層迴圈判斷

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

// else CAS失敗時因為workerCount改變了,繼續內層迴圈嘗試CAS對worker數量+1

}

}

/**

* worker數量+1成功的後續操作

* 新增到workers Set集合,並啟動worker執行緒

*/

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

final ReentrantLock mainLock = this.mainLock;

w = new Worker(firstTask); //1、設定worker這個AQS鎖的同步狀態state=-1

//2、將firstTask設定給worker的成員變數firstTask

//3、使用worker自身這個runnable,呼叫ThreadFactory建立一個執行緒,並設定給worker的成員變數thread

final Thread t = w.thread;

if (t != null) {

mainLock.lock();

try {

//--------------------------------------------這部分程式碼是上鎖的

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

// 當獲取到鎖後,再次檢查

int c = ctl.get();

int rs = runStateOf(c);

//如果執行緒池在執行running<shutdown 或者 執行緒池已經shutdown,且firstTask==null(可能是workQueue中仍有未執行完成的任務,建立沒有初始任務的worker執行緒執行)

//worker數量-1的操作在addWorkerFailed()

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable   執行緒已經啟動,拋非法執行緒狀態異常

throw new IllegalThreadStateException();

workers.add(w);//workers是一個HashSet<Worker>

//設定最大的池大小largestPoolSize,workerAdded設定為true

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

//--------------------------------------------

}

finally {

mainLock.unlock();

}

//如果往HashSet中新增worker成功,啟動執行緒

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

//如果啟動執行緒失敗

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

addWorker(Runnable firstTask, boolean core)引數:     firstTask:    worker執行緒的初始任務,可以為空     core:           true:將corePoolSize作為上限,false:將maximumPoolSize作為上限addWorker方法有4種傳參的方式:

    1、addWorker(command, true)

    2、addWorker(command, false)

    3、addWorker(null, false)

    4、addWorker(null, true)

在execute方法中就使用了前3種,結合這個核心方法進行以下分析     第一個:執行緒數小於corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就返回false     第二個:當佇列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果執行緒池也滿了的話就返回false     第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker線上程執行的時候會去任務佇列裡拿任務,這樣就相當於建立了一個新的執行緒,只是沒有馬上分配任務     第四個:這個方法就是放一個null的task進Workers Set,而且是在小於corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就返回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為執行緒池預先啟動corePoolSize個worker等待從workQueue中獲取任務執行執行流程: 1、判斷執行緒池當前是否為可以新增worker執行緒的狀態,可以則繼續下一步,不可以return false:     A、執行緒池狀態>shutdown,可能為stop、tidying、terminated,不能新增worker執行緒     B、執行緒池狀態==shutdown,firstTask不為空,不能新增worker執行緒,因為shutdown狀態的執行緒池不接收新任務     C、執行緒池狀態==shutdown,firstTask==null,workQueue為空,不能新增worker執行緒,因為firstTask為空是為了新增一個沒有任務的執行緒再從workQueue獲取task,而workQueue為空,說明新增無任務執行緒已經沒有意義 2、執行緒池當前執行緒數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了return false,沒超過則對workerCount+1,繼續下一步 3、線上程池的ReentrantLock保證下,向Workers Set中新增新建立的worker例項,新增完成後解鎖,並啟動worker執行緒,如果這一切都成功了,return true,如果新增worker入Set失敗或啟動失敗,呼叫addWorkerFailed()邏輯

3、內部類Worker

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

/**

* Class Worker mainly maintains interrupt control state for

* threads running tasks, along with other minor bookkeeping.

* This class opportunistically extends AbstractQueuedSynchronizer

* to simplify acquiring and releasing a lock surrounding each

* task execution.  This protects against interrupts that are

* intended to wake up a worker thread waiting for a task from

* instead interrupting a task being run.  We implement a simple

* non-reentrant mutual exclusion lock rather than use

* ReentrantLock because we do not want worker tasks to be able to

* reacquire the lock when they invoke pool control methods like

* setCorePoolSize.  Additionally, to suppress interrupts until

* the thread actually starts running tasks, we initialize lock

* state to a negative value, and clear it upon start (in

* runWorker).

*

* Worker類大體上管理著執行執行緒的中斷狀態 和 一些指標

* Worker類投機取巧的繼承了AbstractQueuedSynchronizer來簡化在執行任務時的獲取、釋放鎖

* 這樣防止了中斷在執行中的任務,只會喚醒(中斷)在等待從workQueue中獲取任務的執行緒

* 解釋:

*   為什麼不直接執行execute(command)提交的command,而要在外面包一層Worker呢??

*   主要是為了控制中斷

*   用什麼控制??

*   用AQS鎖,當執行時上鎖,就不能中斷,TreadPoolExecutor的shutdown()方法中斷前都要獲取worker鎖

*   只有在等待從workQueue中獲取任務getTask()時才能中斷

* worker實現了一個簡單的不可重入的互斥鎖,而不是用ReentrantLock可重入鎖

* 因為我們不想讓在呼叫比如setCorePoolSize()這種執行緒池控制方法時可以再次獲取鎖(重入)

* 解釋:

*   setCorePoolSize()時可能會interruptIdleWorkers(),在對一個執行緒interrupt時會要w.tryLock()

*   如果可重入,就可能會在對執行緒池操作的方法中中斷執行緒,類似方法還有:

*   setMaximumPoolSize()

*   setKeppAliveTime()

*   allowCoreThreadTimeOut()

*   shutdown()

* 此外,為了讓執行緒真正開始後才可以中斷,初始化lock狀態為負值(-1),在開始runWorker()時將state置為0,而state>=0才可以中斷

*

*

* Worker繼承了AQS,實現了Runnable,說明其既是一個可執行的任務,也是一把鎖(不可重入)

*/

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

{

/**

* This class will never be serialized, but we provide a

* serialVersionUID to suppress a javac warning.

*/

private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in.  Null if factory fails. */

final Thread thread; //利用ThreadFactory和 Worker這個Runnable建立的執行緒物件

/** Initial task to run.  Possibly null. */

Runnable firstTask;

/** Per-thread task counter */

volatile long completedTasks;

/**

* Creates with given first task and thread from ThreadFactory.

* @param firstTask the first task (null if none)

*/

Worker(Runnable firstTask) {

//設定AQS的同步狀態private volatile int state,是一個計數器,大於0代表鎖已經被獲取

setState(-1); // inhibit interrupts until runWorker

// 在呼叫runWorker()前,禁止interrupt中斷,在interruptIfStarted()方法中會判斷 getState()>=0

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this); //根據當前worker建立一個執行緒物件

//當前worker本身就是一個runnable任務,也就是不會用引數的firstTask建立執行緒,而是呼叫當前worker.run()時呼叫firstTask.run()

}

/** Delegates main run loop to outer runWorker  */

public void run() {

runWorker(this); //runWorker()是ThreadPoolExecutor的方法

}

// Lock methods

//

// The value 0 represents the unlocked state. 0代表“沒被鎖定”狀態

// The value 1 represents the locked state. 1代表“鎖定”狀態

protected boolean isHeldExclusively() {

return getState() != 0;

}

/**

* 嘗試獲取鎖

* 重寫AQS的tryAcquire(),AQS本來就是讓子類來實現的

*/

protected boolean tryAcquire(int unused) {

//嘗試一次將state從0設定為1,即“鎖定”狀態,但由於每次都是state 0->1,而不是+1,那麼說明不可重入

//且state==-1時也不會獲取到鎖

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread()); //設定exclusiveOwnerThread=當前執行緒

return true;

}

return false;

}

/**

* 嘗試釋放鎖

* 不是state-1,而是置為0

*/

protected boolean tryRelease(int unused) {

setExclusiveOwnerThread(null);

setState(0);

return true;

}

public void lock()        { acquire(1); }

public boolean tryLock()  { return tryAcquire(1); }

public void unlock()      { release(1); }

public boolean isLocked() { return isHeldExclusively(); }

/**

* 中斷(如果執行)

* shutdownNow時會迴圈對worker執行緒執行

* 且不需要獲取worker鎖,即使在worker執行時也可以中斷

*/

void interruptIfStarted() {

Thread t;

//如果state>=0、t!=null、且t沒有被中斷

//new Worker()時state==-1,說明不能中斷

if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

try {

t.interrupt();

} catch (SecurityException ignore) {

}

}

}

}

Worker類 Worker類本身既實現了Runnable,又繼承了AbstractQueuedSynchronizer(以下簡稱AQS),所以其既是一個可執行的任務,又可以達到鎖的效果new Worker() 1、將AQS的state置為-1,在runWoker()前不允許中斷 2、待執行的任務會以引數傳入,並賦予firstTask 3、用Worker這個Runnable建立Thread

之所以Worker自己實現Runnable,並建立Thread,在firstTask外包一層,是因為要通過Worker控制中斷,而firstTask這個工作任務只是負責執行業務Worker控制中斷主要有以下幾方面: 1、初始AQS狀態為-1,此時不允許中斷interrupt(),只有在worker執行緒啟動了,執行了runWoker(),將state置為0,才能中斷     不允許中斷體現在:     A、shutdown()執行緒池時,會對每個worker tryLock()上鎖,而Worker類這個AQS的tryAcquire()方法是固定將state從0->1,故初始狀態state==-1時tryLock()失敗,沒發interrupt()     B、shutdownNow()執行緒池時,不用tryLock()上鎖,但呼叫worker.interruptIfStarted()終止worker,interruptIfStarted()也有state>0才能interrupt的邏輯 2、為了防止某種情況下,在執行中的worker被中斷,runWorker()每次執行任務時都會lock()上鎖,而shutdown()這類可能會終止worker的操作需要先獲取worker的鎖,這樣就防止了中斷正在執行的執行緒

Worker實現的AQS為不可重入鎖,為了是在獲得worker鎖的情況下再進入其它一些需要加鎖的方法

Worker和Task的區別: Worker是執行緒池中的執行緒,而Task雖然是runnable,但是並沒有真正執行,只是被Worker呼叫了run方法,後面會看到這部分的實現。

4、runWorker()  --  執行任務

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

/**

* Main worker run loop.  Repeatedly gets tasks from queue and

* executes them, while coping with a number of issues:

* 重複的從佇列中獲取任務並執行,同時應對一些問題:

*

* 1. We may start out with an initial task, in which case we

* don't need to get the first one. Otherwise, as long as pool is

* running, we get tasks from getTask. If it returns null then the

* worker exits due to changed pool state or configuration

* parameters.  Other exits result from exception throws in

* external code, in which case completedAbruptly holds, which

* usually leads processWorkerExit to replace this thread.

* 我們可能使用一個初始化任務開始,即firstTask為null

* 然後只要執行緒池在執行,我們就從getTask()獲取任務

* 如果getTask()返回null,則worker由於改變了執行緒池狀態或引數配置而退出

* 其它退出因為外部程式碼拋異常了,這會使得completedAbruptly為true,這會導致在processWorkerExit()方法中替換當前執行緒

*

* 2. Before running any task, the lock is acquired to prevent

* other pool interrupts while the task is executing, and

* clearInterruptsForTaskRun called to ensure that unless pool is

* stopping, this thread does not have its interrupt set.

* 在任何任務執行之前,都需要對worker加鎖去防止在任務執行時,其它的執行緒池中斷操作

* clearInterruptsForTaskRun保證除非執行緒池正在stoping,執行緒不會被設定中斷標示

*

* 3. Each task run is preceded by a call to beforeExecute, which

* might throw an exception, in which case we cause thread to die

* (breaking loop with completedAbruptly true) without processing

* the task.

* 每個任務執行前會呼叫beforeExecute(),其中可能丟擲一個異常,這種情況下會導致執行緒die(跳出迴圈,且completedAbruptly==true),沒有執行任務

* 因為beforeExecute()的異常沒有cache住,會上拋,跳出迴圈

*

* 4. Assuming beforeExecute completes normally, we run the task,

* gathering any of its thrown exceptions to send to

* afterExecute. We separately handle RuntimeException, Error

* (both of which the specs guarantee that we trap) and arbitrary

* Throwables.  Because we cannot rethrow Throwables within

* Runnable.run, we wrap them within Errors on the way out (to the

* thread's UncaughtExceptionHandler).  Any thrown exception also

* conservatively causes thread to die.

* 假定beforeExecute()正常完成,我們執行任務

* 彙總任何丟擲的異常併發送給afterExecute(task, thrown)

* 因為我們不能在Runnable.run()方法中重新上拋Throwables,我們將Throwables包裝到Errors上拋(會到執行緒的UncaughtExceptionHandler去處理)

* 任何上拋的異常都會導致執行緒die

*

* 5. After task.run completes, we call afterExecute, which may

* also throw an exception, which will also cause thread to

* die. According to JLS Sec 14.20, this exception is the one that

* will be in effect even if task.run throws.

* 任務執行結束後,呼叫afterExecute(),也可能拋異常,也會導致執行緒die

* 根據JLS Sec 14.20,這個異常(finally中的異常)會生效

*

* The net effect of the exception mechanics is that afterExecute

* and the thread's UncaughtExceptionHandler have as accurate

* information as we can provide about any problems encountered by

* user code.

*

* @param w the worker

*/

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

// new Worker()是state==-1,此處是呼叫Worker類的tryRelease()方法,將state置為0, 而interruptIfStarted()中只有state>=0才允許呼叫中斷

boolean completedAbruptly = true; //是否“突然完成”,如果是由於異常導致的進入finally,那麼completedAbruptly==true就是突然完成的

try {

/**

* 如果task不為null,或者從阻塞佇列中getTask()不為null

*/

while (task != null || (task = getTask()) != null) {

w.lock(); //上鎖,不是為了防止併發執行任務,為了在shutdown()時不終止正在執行的worker

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted.  This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

/**

* clearInterruptsForTaskRun操作

* 確保只有在執行緒stoping時,才會被設定中斷標示,否則清除中斷標示

* 1、如果執行緒池狀態>=stop,且當前執行緒沒有設定中斷狀態,wt.interrupt()

* 2、如果一開始判斷執行緒池狀態<stop,但Thread.interrupted()為true,即執行緒已經被中斷,又清除了中斷標示,再次判斷執行緒池狀態是否>=stop

*   是,再次設定中斷標示,wt.interrupt()

*   否,不做操作,清除中斷標示後進行後續步驟

*/

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt(); //當前執行緒呼叫interrupt()中斷

try {

//執行前(子類實現)

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

}

catch (RuntimeException x) {

thrown = x; throw x;

}

catch (Error x) {

thrown = x; throw x;

}

catch (Throwable x) {

thrown = x; throw new Error(x);

}

finally {

//執行後(子類實現)

afterExecute(task, thrown); //這裡就考驗catch和finally的執行順序了,因為要以thrown為引數

}

}

finally {

task = null; //task置為null

w.completedTasks++; //完成任務數+1

w.unlock(); //解鎖

}

}

completedAbruptly = false;

}

finally {

//處理worker的退出

processWorkerExit(w, completedAbruptly);

}

}

runWorker(Worker w)執行流程: 1、Worker執行緒啟動後,通過Worker類的run()方法呼叫runWorker(this) 2、執行任務之前,首先worker.unlock(),將AQS的state置為0,允許中斷當前worker執行緒 3、開始執行firstTask,呼叫task.run(),在執行任務前會上鎖wroker.lock(),在執行完任務後會解鎖,為了防止在任務執行時被執行緒池一些中斷操作中斷 4、在任務執行前後,可以根據業務場景自定義beforeExecute() 和 afterExecute()方法 5、無論在beforeExecute()、task.run()、afterExecute()發生異常上拋,都會導致worker執行緒終止,進入processWorkerExit()處理worker退出的流程 6、如正常執行完當前task後,會通過getTask()從阻塞佇列中獲取新任務,當佇列中沒有任務,且獲取任務超時,那麼當前worker也會進入退出流程

5、getTask()  --  獲取任務

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

/**

* Performs blocking or timed wait for a task, depending on

* current configuration settings, or returns null if this worker

* must exit because of any of:  以下情況會返回null

* 1. There are more than maximumPoolSize workers (due to

*    a call to setMaximumPoolSize).

*    超過了maximumPoolSize設定的執行緒數量(因為呼叫了setMaximumPoolSize())

* 2. The pool is stopped.

*    執行緒池被stop

* 3. The pool is shutdown and the queue is empty.

*    執行緒池被shutdown,並且workQueue空了

* 4. This worker timed out waiting for a task, and timed-out

*    workers are subject to termination (that is,

*    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})

*    both before and after the timed wait.

*    執行緒等待任務超時

*

* @return task, or null if the worker must exit, in which case

*         workerCount is decremented

*         返回null表示這個worker要結束了,這種情況下workerCount-1

*/

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

/**

* 外層迴圈

* 用於判斷執行緒池狀態

*/

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

/**

* 對執行緒池狀態的判斷,兩種情況會workerCount-1,並且返回null

* 執行緒池狀態為shutdown,且workQueue為空(反映了shutdown狀態的執行緒池還是要執行workQueue中剩餘的任務的)

* 執行緒池狀態為stop(shutdownNow()會導致變成STOP)(此時不用考慮workQueue的情況)

*/

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

decrementWorkerCount(); //迴圈的CAS減少worker數量,直到成功

return null;

}

boolean timed;      // Are workers subject to culling?

// 是否需要定時從workQueue中獲取

/**

* 內層迴圈

* 要麼break去workQueue獲取任務

* 要麼超時了,worker count-1

*/

for (;;) {

int wc = workerCountOf(c);

timed = allowCoreThreadTimeOut || wc > corePoolSize; //allowCoreThreadTimeOut預設為false

//如果allowCoreThreadTimeOut為true,說明corePoolSize和maximum都需要定時

//如果當前執行執行緒數<maximumPoolSize,並且timedOut 和 timed 任一為false,跳出迴圈,開始從workQueue獲取任務

if (wc <= maximumPoolSize && ! (timedOut && timed))

break;

/**

* 如果到了這一步,說明要麼執行緒數量超過了maximumPoolSize(可能maximumPoolSize被修改了)

* 要麼既需要計時timed==true,也超時了timedOut==true

* worker數量-1,減一執行一次就行了,然後返回null,在runWorker()中會有邏輯減少worker執行緒

* 如果本次減一失敗,繼續內層迴圈再次嘗試減一

*/

if (compareAndDecrementWorkerCount(c))

return null;

//如果減數量失敗,再次讀取ctl

c = ctl.get();  // Re-read ctl

//如果執行緒池執行狀態發生變化,繼續外層迴圈

//如果狀態沒變,繼續內層迴圈

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

try {

//poll() - 使用  LockSupport.parkNanos(this, nanosTimeout) 掛起一段時間,interrupt()時不會拋異常,但會有中斷響應

//take() - 使用 LockSupport.park(this) 掛起,interrupt()時不會拋異常,但會有中斷響應

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    //大於corePoolSize

workQueue.take();                                        //小於等於corePoolSize

//如獲取到了任務就返回

if (r != null)

return r;

//沒有返回,說明超時,那麼在下一次內層迴圈時會進入worker count減一的步驟

timedOut = true;

}

/**

* blockingQueue的take()阻塞使用LockSupport.park(this)進入wait狀態的,對LockSupport.park(this)進