1. 程式人生 > >Java執行緒池ThreadPoolExecutor原理詳解

Java執行緒池ThreadPoolExecutor原理詳解

目錄

前言

簡單例項

引數與原理

參考連結

前言

最近java面試,基本都會考察多執行緒的,多執行緒就一定要問執行緒池的,然而我卻在同一個問題上栽跟頭兩次,也是醉醉的。在懊悔之餘所以專門花了一個下午的時間把它詳細總結整理了一遍,也以此告誡自己學東西切不可浮躁,要靜心專研,打紮實基礎。

問題:

問:新建執行緒池有哪幾個引數,具體含義是什麼呢?

問:假如我設定corePoolSize為2,maximumPoolSize為5,現線上程池裡已經有1個了,我再往裡面新增第2到6個,具體的執行邏輯是什麼呢?

問:常用新建執行緒池的方法有哪幾個,他們與ThreadPoolExecutor有關係嗎?

執行緒池的作用

執行緒雖然提供了並行處理速度,但是執行緒的新建銷燬帶來的系統開銷是很大滴,為了能夠更科學的利用執行緒,才有了大名鼎鼎的執行緒池:ThreadPoolExecutor類。作用大致如下:

1、提高資源利用率
執行緒池可以重複利用已經建立了的執行緒,執行緒任務完成後,可以不銷燬而是被安排去做別的任務。
2、具有可管理性
程式設計師可以設定執行緒個數,執行緒池會根據系統的使用情況調整執行的執行緒,降低系統開銷。

簡單例項

package thread.threadpool;



import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;





/**

 * 執行緒池的使用

 * @author shu jun

 *

 */

public class ThreadPoolDemo {



    public static void main(String[] args) {

      //1. 新建五個執行緒

      Thread ta = new MyThread("thread-a");

        Thread tb = new MyThread("thread-b");

        Thread tc = new MyThread("thread-c");

        Thread td = new MyThread("thread-d");

        Thread te = new MyThread("thread-e");

       

        //2. 建立一個可重用固定執行緒數的執行緒池

      ThreadPoolExecutor pool =  new ThreadPoolExecutor(2, 2, 0L,

                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 

                new ThreadPoolExecutor.AbortPolicy());

     

        //3. 將執行緒放入池中進行執行

        pool.execute(ta);

        pool.execute(tb);

        pool.execute(tc);

        pool.execute(td);

        pool.execute(te);



//4. 關閉執行緒池

        pool.shutdown();

    }

}



class MyThread extends Thread {



   public MyThread(String threadName) {

      this.setName(threadName);

   }

  

    @Override

    public void run() {

        System.out.println(Thread.currentThread().getName() + ", " + this.getName()+ " is running.");

    }

}

輸出如下:

引數與原理

ThreadPoolExecutor有4個建構函式,最全的就是上面程式碼中的七個構造引數,其它建構函式只是預設給出引數而已。

corePoolSize

核心執行緒池的執行緒數量

maximumPoolSize

最大的執行緒池執行緒數量

keepAliveTime

空閒執行緒活動保持時間,要當 執行緒數>corePoolSize時才起作用。

unit

執行緒活動保持時間的單位。

workQueue

指定任務佇列所使用的阻塞佇列

ThreadFactory

執行緒工廠,用來建立執行緒

RejectedExecutionHandler

佇列已滿,而且任務量大於最大執行緒的異常處理策略

執行緒往執行緒池裡面丟,最後執行呼叫pool.execute(Thread); excute函式的核心邏輯如下:

Jdk excute原始碼如下:

public void execute(Runnable command) {

    // 如果任務為null,則丟擲異常。

    if (command == null)

        throw new NullPointerException();

    // 獲取ctl對應的int值。該int值儲存了"執行緒池中任務的數量"和"執行緒池狀態"資訊

    int c = ctl.get();

    // 1. 當執行緒池中的任務數量 < "核心池大小"時,即執行緒池中少於corePoolSize個任務。

    // 則通過addWorker(command, true)新建一個執行緒,並將任務(command)新增到該執行緒中;然後,啟動該執行緒從而執行任務。

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

}



    // 2. 當執行緒池中的任務數量 >= "核心池大小"時,

    // 2.1 而且,"執行緒池處於允許狀態"時,則嘗試將任務新增到阻塞佇列中。

    if (isRunning(c) && workQueue.offer(command)) {

        // 再次確認“執行緒池狀態”,若執行緒池異常終止了,則刪除任務;然後通過reject()執行相應的拒絕策略的內容。

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        // 否則,如果"執行緒池中任務數量"為0,則通過addWorker(null, false)嘗試新建一個執行緒,新建執行緒對應的任務為null。

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

}



    //通過addWorker(command, false)新建一個執行緒,並將任務(command)新增到該執行緒中;然後,啟動該執行緒從而執行任務。

    // 3. 如果addWorker(command, false)執行失敗,則通過reject()執行相應的拒絕策略的內容。

    else if (!addWorker(command, false))

        reject(command);

}
  1. 這一步很好理解的,如果"執行緒池中任務數量" < "核心池大小"時,即執行緒池中少於corePoolSize個任務;此時就新建一個執行緒,並將該任務新增到執行緒中進行執行。
  2. 當執行緒池中的任務數量 >= "核心池大小"時,那麼就跟阻塞佇列workQueue相關了。來一個執行緒就丟到workQueue(這個阻塞佇列大小可以自由設定)裡面去,如果佇列容量很大很大,那就沒maximumPoolSize啥事了,一直往裡面放就可以了,但是這樣是很消耗系統資源滴,所以阻塞佇列經常是有界的,如果滿了,那就繼續第3步。
  3. 佇列滿了就去和maximumPoolSize判斷,小於等於則建立新執行緒,大於則不完了,按照拒絕策略RejectedExecutionHandler,執行reject。具體邏輯在這個比較複雜的addWorker函式中,作用可以理解為新增任務到阻塞佇列;
private boolean addWorker(Runnable firstTask, boolean core) {

           … 此處省略若干字…

          for (;;) {
            // 獲取執行緒池中任務的數量。
            int wc = workerCountOf(c);
            // 如果"執行緒池中任務的數量"超過限制,則返回false。
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
         …
}
原理基本上就是以上三步了。
剩下的三個引數中keepAliveTime和unit比較好理解,那麼ThreadFactory呢?

ThreadFactory是一個執行緒工廠,應該說的是執行緒的建立方式吧,預設是用defaultThreadFactory()方法返回DefaultThreadFactory物件,裡面用newThread()來建立的執行緒,這麼搞出來的執行緒對應的任務是Runnable物件,它建立的執行緒都是“非守護執行緒”而且“執行緒優先順序都是Thread.NORM_PRIORITY”。

拒絕策略RejectedExecutionHandler具體又分為:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。

ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)

ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

阻塞佇列BlockingQueue還是很有內容的,要總結拓展的話可以挖掘出蠻多好玩的,大致分為三類:
1. 直接提交, SynchronousQueue。

它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。

2. 無界佇列, LinkedBlockingQueue。

LinkedBlockingQueue預設為空,則是無界的了,當然也可以傳入引數指定執行緒大小的。

3.有界佇列,ArrayBlockingQueue

這又點像LinkedList和ArrayList的感覺,資料儲存結構不一樣。
 

執行緒池的常用建立方式

常見三種方式:

  • Executors.newCachedThreadPool():無限執行緒池。
  • Executors.newFixedThreadPool(nThreads):建立固定大小的執行緒池。
  • Executors.newSingleThreadExecutor():建立單個執行緒的執行緒池。

檢視原始碼發現其實它們呼叫的都是ThreadPoolExecutor建構函式。

類之間的繼承關係如下:

那三個常用方法都是Executors的static方法,而又去呼叫ThreadPoolExcutor;

public static ExecutorService newCachedThreadPool() {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                      60L, TimeUnit.SECONDS,

                                      new SynchronousQueue<Runnable>());

    }



public static ExecutorService newFixedThreadPool(int nThreads) {

        return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue<Runnable>());

    }



public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                      60L, TimeUnit.SECONDS,

                                      new SynchronousQueue<Runnable>(),

                                      threadFactory);

}

可以把上面第2小節的例子修改下 , 可以測試不同的執行緒池新建方式,      

 //2. 建立一個可重用固定執行緒數的執行緒池

      /*ThreadPoolExecutor pool =  new ThreadPoolExecutor(2, 2, 0L,

                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 

                new ThreadPoolExecutor.AbortPolicy());*/

       

        // 無限執行緒池

        //ExecutorService pool =  Executors.newCachedThreadPool();

        // 建立固定大小的執行緒池,與上面一開始面建立ThreadPoolExecutor是等效滴

        ExecutorService pool =  Executors.newFixedThreadPool(2);

        // 建立單個執行緒的執行緒池

        //ExecutorService pool =  Executors.newSingleThreadExecutor();

參考連結

執行緒池真是博大精神,還有不少可拓展研究的,不過主體脈絡應該就這些了。