1. 程式人生 > >Executor執行緒池原理與原始碼解讀

Executor執行緒池原理與原始碼解讀

執行緒池為執行緒生命週期的開銷和資源不足問題提供瞭解決方 案。通過對多個任務重用執行緒,執行緒建立的開銷被分攤到了多個任務上。

執行緒實現方式

Thread、Runnable、Callable

//實現Runnable介面的類將被Thread執行,表示一個基本任務
public interface Runnable {
    //run方法就是它所有內容,就是實際執行的任務
    public abstract void run();
}
//Callable同樣是任務,與Runnable介面的區別在於它介面泛型,同時它執行任務候帶有返回值;
//Callable的使用通過外層封裝成Future來使用
public interface Callable<V> {
    //相對於run方法,call方法帶有返回值
    V call() throws Exception;
}

注意:啟動Thread執行緒只能用start(JNI方法)來啟動,start方法通知虛擬機器,虛擬機器通過呼叫器對映到底層作業系統,通過作業系統來建立執行緒來執行當前任務的run方法

Executor框架

Executor介面是執行緒池框架中最基礎的部分,定義了一個用於執行Runnable的execute方法。 從圖中可以看出Exectuor下有一個重要的子介面ExecutorService,其中定義了執行緒池的具體行為:

  • execute(Runnable runnable):執行Runnable型別的任務
  • submit(task):用來提交Callable或者Runnable任務,並返回代表此任務的Future物件
  • shutdown():在完成已經提交的任務後封閉辦事,不在接管新的任務
  • shutdownNow():停止所有正在履行的任務並封閉辦事
  • isTerminated():是一個鉤子函式,測試是否所有任務都履行完畢了
  • isShutdown():是一個鉤子函式,測試是否該ExecutorService是否被關閉 Executor框架

ExecutorService中的重點屬性:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

ctl:對執行緒池的執行狀態和執行緒池中有效執行緒的數量進行控制的一個欄位,它包含兩部分資訊:執行緒池的執行狀態(runState)和執行緒池內有效執行緒的數量(workerCount)。

這裡可以看到,使用Integer型別來儲存,高3位儲存runState,低29位儲存workerCount。COUNT_BITS 就是29,CAPACITY 就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億。

ctl相關方法:

//獲取執行狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//獲取活動執行緒數
private static int workerCountOf(int c)  { return c & CAPACITY; }
//獲取執行狀態和活動執行緒數的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

執行緒池的狀態:

RUNNING = ­1 << COUNT_BITS; //高3位為111
SHUTDOWN = 0 << COUNT_BITS; //高3位為000
STOP = 1 << COUNT_BITS; //高3位為001
TIDYING = 2 << COUNT_BITS; //高3位為010
TERMINATED = 3 << COUNT_BITS; //高3位為011

執行緒池執行狀態扭轉

1、RUNNING

  • 狀態說明:執行緒池處於RUNNING狀態,能夠接收新任務,以及對已新增的任務進行處理。
  • 狀態切換:執行緒池的初始化狀態是RUNNING。換句話說,執行緒池一旦被建立,就處於RUNNING狀態,並且執行緒池中的任務數為0。

2、SHUTDOWN

  • 狀態說明:執行緒池處於SHUTDOWN狀態,不接收新任務,能夠處理已經新增的任務。
  • 狀態切換:呼叫shutdown()方法時,執行緒池由RUNNING -> SHUTDOWN。

3、STOP

  • 狀態說明:執行緒池處於STOP狀態,不接收新任務,不處理已提交的任務,並且會中斷正在處理的任務。
  • 狀態切換:呼叫執行緒池中的shutdownNow()方法時,執行緒池由(RUNNING or SHUTDOWN) -> STOP。

4、TIDYING

  • 狀態說明:當所有的任務已經停止,ctl記錄“任務數量”為0,執行緒池會變為TIDYING狀態。當執行緒池處於TIDYING狀態時,會執行鉤子函式 terminated()。 terminated()在ThreadPoolExecutor類中是空, 的,若使用者想線上程池變為TIDYING時,進行相應處理,可以通過過載 terminated()函式來實現。
  • 狀態切換:當執行緒池在SHUTDOWN狀態下,阻塞佇列為空並且執行緒池中執行任務也為空時,就會由SHUTDOWN -> TIDYING。當執行緒池在STOP狀態下,執行緒池中執行的任務為空時,就會由STOP-> TIDYING。

5、TERMINATED

  • 狀態說明:執行緒池執行緒池徹底停止,執行緒池處於TERMINATED狀態,
  • 狀態切換:執行緒池處於TIDYING狀態時,執行完terminated()之後, 就會由TIDYING->TERMINATED。

執行緒池使用

RunTask類:

public class RunTask implements Runnable {
    public void run() {
        System.out.println("Thread name:"+Thread.currentThread().getName());
    }
}

ExecutorSample類:

public class ExecutorSample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i=0;i<20;i++){
            //提交任務無返回值
            executor.execute(new RunTask());
            //任務執行完成後有返回值
            Future<Object> future = executor.submit(new RunTask());
        }
    }
}

執行緒池的具體使用:

  • ThreadPoolExecutor 預設執行緒池
  • ScheduledThreadPoolExecutor 定時執行緒池

ThreadPoolExecutor

執行緒池的建立:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
  • corePoolSize:執行緒池中的核心執行緒數。當提交一個任務時,執行緒池建立一個新執行緒執行任務,直到當前執行緒數等於corePoolSize;如果當前執行緒數為corePoolSize,繼續提交的任務被儲存到阻塞佇列中,等待被執行;如果執行了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有核心執行緒。
  • maximumPoolSize:執行緒池中允許的最大執行緒數。如果當前阻塞佇列滿了,且繼續提交任務,則建立新的執行緒執行任務,前提是當前執行緒數小於maximumPoolSize。
  • keepAliveTime:執行緒池維護執行緒所允許的空閒時間。當執行緒池中的執行緒數量大於corePoolSize時候,如果這時候沒有新的任務提交,核心執行緒外的執行緒不會立即被銷燬,而是會等待,直到等待的時間超過了keepAliveTime unit:keepAliveTime的單位時間
  • workQueue:用於儲存等待被執行的任務的阻塞佇列,且任務必須實現Runnable介面,在JDK中提供瞭如下阻塞佇列: ArrayBlockingQueue:基於陣列結構的有界阻塞佇列,按FIFO排序任務。 LinkedBlockingQueue:基於連結串列結構的阻塞佇列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQueue。 SynchronousQueue:一個不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常高於LinkedBlockingQueue。
  • PriorityBlockingQueue:具有優先順序的無界阻塞佇列。
  • threadFactory:ThreadFactory 型別的變數,用來建立新執行緒。預設使用ThreadFactory.defaultThreadFactory來建立執行緒, 會使新建立執行緒具有相同的NORM_PRIORITY優先順序並且都是非守護執行緒,同時也設定了執行緒名稱。
  • handler:執行緒池的飽和策略。當阻塞佇列滿了,且沒有空閒的工作佇列,如果繼續提交任務,必須採用一種策略處理該任務.

執行緒池的監控:

public long getTaskCount() //執行緒池已執行與未執行的任務總數
public long getCompletedTaskCount() //已完成的任務數
public int getPoolSize() //執行緒池當前的執行緒數
public int getActiveCount() //執行緒池中正在執行任務的執行緒數量

執行緒池的原理

執行緒池執行示意圖

  • 如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意這一個步驟需要獲取全域性鎖)。
  • 如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。
  • 如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(注意這一個步驟需要獲取全域性鎖)。
  • 如果建立的新執行緒將使當前執行的執行緒超出maximumPoolSize,任務將被執行飽和策略。 ThreadPoolExecutor 採用上述的設計思路,是為執行execute()方法時,儘可能避免獲取全域性鎖(一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之後,幾乎所有的execute()方法呼叫都是在執行步驟2,而步驟2不需要獲取