1. 程式人生 > >【JUC】如何理解執行緒池?第四種使用執行緒的方式

【JUC】如何理解執行緒池?第四種使用執行緒的方式

執行緒池的概念

執行緒池的主要工作的控制執行的執行緒的數量,處理過程種將任務放在佇列,執行緒建立後再啟動折現任務,如果執行緒數量超過了最大的數量,則超過部分的執行緒排隊等待,直到其他執行緒執行完畢後,從佇列種取出任務來執行。

處理流程:

1.執行緒池判斷核心執行緒池的執行緒是否全部在執行任務?

  1.1 不是:建立一個新的工作執行緒執行任務。

  1.2 是:

    2. 執行緒池判斷工作佇列是否已經滿了?

      2.1 沒有滿:將新提交的任務儲存在工作佇列中。

      2.2 滿了:

        3. 執行緒池判斷執行緒池的執行緒是否都在工作?

          3.1 是:交由飽和策略來處理這個任務。

          3.2 不是:建立一個新的工作執行緒來執行任務。       

特點:執行緒複用、控制最大併發數、管理執行緒。

執行緒池的優勢

1. 降低資源消耗,通過重複利用已經建立的執行緒,降低了執行緒建立和銷燬產生的消耗。

2. 提高響應速度,任務到達時,任務不需要等待執行緒建立就能立即執行。

3. 提高執行緒的可管理性,執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行同一的分配、調優和監控。

執行緒池的使用

 Java執行緒池是通過Executor框架實現的,該框架中用到了Executor、Executors、ExecutorService和ThreadPoolExecutor類。

 

 

 

 

 

 具體使用示例:

 1     public static void fixedThreadPool() {
 2         ExecutorService threadPool = Executors.newFixedThreadPool(5);//固定執行緒
 3         try {
 4             for (int i = 0; i < 10; i++) {
 5                 threadPool.execute(()->{
 6                     System.out.println(Thread.currentThread().getName());
 7                 });
 8             }
 9         }catch (Exception e){
10             e.printStackTrace();
11         }finally {
12             threadPool.shutdown();
13         }
14     }

輸出結果

pool-1-thread-2
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5
pool-1-thread-1
pool-1-thread-3
pool-1-thread-5
pool-1-thread-1
pool-1-thread-2
pool-1-thread-4
View Code

執行緒池的原始碼及重要引數

Executors.newFixedThreadPool(int)

固定執行緒數,適用場景:執行長期任務,效能好。

1     public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads,
3                                       0L, TimeUnit.MILLISECONDS,
4                                       new LinkedBlockingQueue<Runnable>());
5     }

Executors.newSingleThreadExecutor()

一池一個執行緒,使用場景:一個任務接一個任務執行的時候。

1     public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService
3             (new ThreadPoolExecutor(1, 1,
4                                     0L, TimeUnit.MILLISECONDS,
5                                     new LinkedBlockingQueue<Runnable>()));
6     }

Executors.newCachedThreadPool()

N個執行緒,帶快取,適用場景:執行很多短期非同步的小程式或者負載較輕的伺服器。

1     public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                       60L, TimeUnit.SECONDS,
4                                       new SynchronousQueue<Runnable>());
5     }

ThreadPoolExecutor

ThreadPoolExecutor的執行示意圖:

 

 

1. corePoolSize:執行緒池中的常駐核心執行緒數

2. maximumPoolSize:執行緒池能夠容納同時執行的最大執行緒數,必須大於等於1【擴容的上限】。如果工作佇列滿了,core也滿了的時候,執行緒池會擴容,直到達到maximumPoolSize(新來的任務會直接搶佔擴容執行緒,不進入工作佇列,工作佇列中的任務繼續等待)。

 1     public static void main(String[] args) {
 2         ExecutorService threadPool = new ThreadPoolExecutor(
 3                 2, //corePoolSize
 4                 5, //maximumPoolSize
 5                 100L, //keepAliveTime
 6                 TimeUnit.SECONDS,
 7                 new LinkedBlockingDeque<>(3),
 8                 Executors.defaultThreadFactory(),
 9                 new ThreadPoolExecutor.AbortPolicy());//N個執行緒帶快取
10         try {
11             for (int i = 1; i <= 6; i++) {
12                 final int tmp = i;
13                 threadPool.execute(()->{
14                     System.out.println(Thread.currentThread().getName()+"執行緒"+",執行任務"+tmp);
15                     try {
16                         TimeUnit.SECONDS.sleep(4);
17                     } catch (InterruptedException e) {
18                         e.printStackTrace();
19                     }
20                 });
21             }
22         }catch (Exception e){
23             e.printStackTrace();
24         }finally {
25             threadPool.shutdown();
26         }
27     }

輸出結果:

pool-1-thread-2執行緒,執行任務2
pool-1-thread-3執行緒,執行任務6
pool-1-thread-1執行緒,執行任務1
pool-1-thread-3執行緒,執行任務3
pool-1-thread-2執行緒,執行任務4
pool-1-thread-1執行緒,執行任務5
View Code

當執行緒池中有2個核心執行緒時,執行緒1和2正在執行任務1和2,任務3、4、5在工作佇列中等候,此時工作佇列滿了,core也滿了的時候,且core< maximumPoolSize,任務6的出現引起執行緒池的擴容,任務6在3、4、5執行任務前進行了搶佔。所以從輸出結果可以看出新來的任務會直接搶佔新擴容的執行緒。

3. keepAliveTime:多餘的空閒執行緒的存活時間。當前執行緒數超過corePoolSize的時候,空閒時間達到keepAliveTime時,多餘的空閒執行緒會被銷燬直到剩下corePoolSize的數量的執行緒。

4. unit:keepAliveTime的單位

5. workQueue:(阻塞佇列)工作佇列,任務等待區,被提交但是沒有被執行的任務。

6. threadFactory:執行緒工廠,用於建立執行緒,一般用預設即可。

7. handler:拒絕策略。當佇列滿了並且工作執行緒大於執行緒池的最大執行緒數的時候觸發拒絕策略。

 

5個引數的建構函式

1     public ThreadPoolExecutor(int corePoolSize,
2                               int maximumPoolSize,
3                               long keepAliveTime,
4                               TimeUnit unit,
5                               BlockingQueue<Runnable> workQueue) {
6         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
7              Executors.defaultThreadFactory(), defaultHandler);
8     }

7個引數的建構函式

 1     public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue,
 6                               ThreadFactory threadFactory,
 7                               RejectedExecutionHandler handler) {
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0)
12             throw new IllegalArgumentException();
13         if (workQueue == null || threadFactory == null || handler == null)
14             throw new NullPointerException();
15         this.acc = System.getSecurityManager() == null ?
16                 null :
17                 AccessController.getContext();
18         this.corePoolSize = corePoolSize;
19         this.maximumPoolSize = maximumPoolSize;
20         this.workQueue = workQueue;
21         this.keepAliveTime = unit.toNanos(keepAliveTime);
22         this.threadFactory = threadFactory;
23         this.handler = handler;
24     }

執行緒池的底層工作原理及原始碼

ThreadPoolExecutor執行execute方法分下面4種情況。

1)如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步驟需要獲取全域性鎖)。

2)如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。

3)如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(注意,執行這一步驟需要獲取全域性鎖)。

4)如果建立新執行緒將使當前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫RejectedExecutionHandler.rejectedExecution()方法。

 

ThreadPoolExecutor採取上述步驟的總體設計思路,是為了在執行execute()方法時,儘可能地避免獲取全域性鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之後(當前執行的執行緒數大於等於corePoolSize),幾乎所有的execute()方法呼叫都是執行步驟2,而步驟2不需要獲取全域性鎖。

 

 1     public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         int c = ctl.get();
 5         //如果執行緒數小於核心執行緒數,建立執行緒執行任務
 6         if (workerCountOf(c) < corePoolSize) {
 7             if (addWorker(command, true))
 8                 return;
 9             c = ctl.get();
10         }
11         //如果執行緒數大於等於核心執行緒數或執行緒建立失敗,當前任務加入工作佇列
12         if (isRunning(c) && workQueue.offer(command)) {
13             int recheck = ctl.get();
14             if (! isRunning(recheck) && remove(command))
15                 reject(command);
16             else if (workerCountOf(recheck) == 0)
17                 addWorker(null, false);
18         }
19         //如果執行緒數不處於執行中或人物失效無法放入佇列,
20         //且當前執行緒數量小於最大允許的執行緒數,則建立一個執行緒執行任務
21         else if (!addWorker(command, false))
22             reject(command);
23     }

 

工作執行緒:執行緒池建立執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務後,還會迴圈獲取工作佇列裡的任務來執行。我們可以從Worker類的run()方法裡看到。

 1     public void run() {
 2         runWorker(this);
 3     }
 4 
 5     final void runWorker(Worker w) {
 6         Thread wt = Thread.currentThread();
 7         Runnable task = w.firstTask;
 8         w.firstTask = null;
 9         w.unlock(); // allow interrupts
10         boolean completedAbruptly = true;
11         try {//迴圈獲取工作佇列裡的任務執行
12             while (task != null || (task = getTask()) != null) {
13                 w.lock();
14                 // If pool is stopping, ensure thread is interrupted;
15                 // if not, ensure thread is not interrupted.  This
16                 // requires a recheck in second case to deal with
17                 // shutdownNow race while clearing interrupt
18                 if ((runStateAtLeast(ctl.get(), STOP) ||
19                      (Thread.interrupted() &&
20                       runStateAtLeast(ctl.get(), STOP))) &&
21                     !wt.isInterrupted())
22                     wt.interrupt();
23                 try {
24                     beforeExecute(wt, task);
25                     Throwable thrown = null;
26                     try {
27                         task.run();
28                     } catch (RuntimeException x) {
29                         thrown = x; throw x;
30                     } catch (Error x) {
31                         thrown = x; throw x;
32                     } catch (Throwable x) {
33                         thrown = x; throw new Error(x);
34                     } finally {
35                         afterExecute(task, thrown);
36                     }
37                 } finally {
38                     task = null;
39                     w.completedTasks++;
40                     w.unlock();
41                 }
42             }
43             completedAbruptly = false;
44         } finally {
45             processWorkerExit(w, completedAbruptly);
46         }
47     }

執行緒池的拒絕策略(RejectedExecutionHandler)

當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。在JDK 1.5中Java執行緒池框架提供了以下4種策略。
1. AbortPolicy:直接丟擲異常。RejectedExecutionException
2. CallerRunsPolicy:只用呼叫者所線上程來執行任務。不拋棄任務,也不丟擲異常,將任務回退到呼叫者。

例如:任務數 > maximumPoolSize+Queue.capacity=8的時候拒絕任務9和10,任務回退給呼叫者,示例中的呼叫者就是main執行緒。

pool-1-thread-1執行緒,執行任務1
main執行緒,執行任務9
pool-1-thread-3執行緒,執行任務6
pool-1-thread-2執行緒,執行任務2
pool-1-thread-5執行緒,執行任務8
pool-1-thread-4執行緒,執行任務7
main執行緒,執行任務10
pool-1-thread-3執行緒,執行任務3
pool-1-thread-1執行緒,執行任務5
pool-1-thread-5執行緒,執行任務4

3. DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
4. DiscardPolicy:不處理,丟棄掉。

任務佇列(runnableTaskQueue)

1. ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按FIFO(先進先出)原則對元素進行排序。
2. LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
3. SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。

在實際開發中選擇那種執行緒池?

三種:單一/固定數/可變,都不能用。為什麼不用?

在實際的開發中執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式建立執行緒。

因為不使用執行緒池,有可能造成系統建立大量同類執行緒而導致消耗完記憶體或者過度切換的問題。

執行緒池不允許適用Executors去建立,而是通過ThreadPoolExecutor的方式,可以避免資源耗盡的風險。

Executors中的執行緒池物件存在的問題:

1. FixedThreadPool和SingleThreadPool:允許請求佇列的長度為Integer.MAX_VALUE,可能會堆積大量請求,導致OOM

2. CachedThreadPool和ScheduledThreadPool:允許建立執行緒數量為Integer.MAX_VALUE,可能會建立大量請求,導致OOM

所以應該選擇自定義執行緒池。

如何配置自定義的執行緒池引數?

首先查詢伺服器是幾核的?Runtime.getRuntime().availableProcessors();

1. CPU密集型

任務需要大量的運算,而沒有阻塞,CPU一直全速執行,CPU密集任務只有在真正的多核CPU上才可能得到加速。(通過多執行緒)

應該配置儘可能少的執行緒數量:CPU核數+1個執行緒的執行緒池

2. IO密集型

IO密集型任務並不是一直執行任務,則應配置儘可能多的執行緒,

CPU核數 * 2

CPU核數 / 1 - 阻塞係數(0.8~0.9)

補充:CPU密集 & IO密集

CPU密集型,又稱計算密集型任務。它的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視訊進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多工完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。計算密集型任務由於主要消耗CPU資源,因此,程式碼執行效率至關重要。

IO密集型,涉及到網路、磁碟IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和記憶體的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間很少。