1. 程式人生 > >jdk自帶的執行緒池框架ThreadPoolExcutor原始碼分析

jdk自帶的執行緒池框架ThreadPoolExcutor原始碼分析

一、前言

  JUC這部分還有執行緒池這一塊沒有分析,需要抓緊時間分析,下面開始ThreadPoolExecutor,其是執行緒池的基礎,分析完了這個類會簡化之後的分析,執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。下面開始分析。

二、ThreadPoolExecutor資料結構

   在ThreadPoolExecutor的內部,主要由BlockingQueue和AbstractQueuedSynchronizer對其提供支援,BlockingQueue介面有多種資料結構的實現,如

LinkedBlockingQueueArrayBlockingQueue等,而AbstractQueuedSynchronizer在之前有過詳細的分析,有興趣的讀者可以參考。

三、ThreadPoolExecutor原始碼分析

  3.1 類的繼承關係

public class ThreadPoolExecutor extends AbstractExecutorService {}

  說明:ThreadPoolExecutor繼承自AbstractExecutorService,AbstractExecuetorService提供了ExecutorService執行方法的預設實現。

  3.2 類的內部類

  ThreadPoolExecutor的核心內部類為Worker,其對資源進行了複用,減少建立執行緒的開銷,還有若干個策略類。內部類的類圖如下

  說明:可以看到Worker繼承了AQS抽象類並且實現了Runnable介面,其是ThreadPoolExecutor的核心內部類。而對於AbortPolicy,用於被拒絕任務的處理程式,它將丟擲 RejectedExecutionException、CallerRunsPolicy,用於被拒絕任務的處理程式,它直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務、DiscardPolicy,用於被拒絕任務的處理程式,預設情況下它將丟棄被拒絕的任務、DiscardOldestPolicy,用於被拒絕任務的處理程式,它放棄最舊的未處理請求,然後重試 execute

;如果執行程式已關閉,則會丟棄該任務。這些都是拒絕任務提交時的所採用的不同策略。

  ① Worker類

  1. 類的繼承關係  

private final class Worker 
    extends AbstractQueuedSynchronizer 
    implements Runnable {}

   說明:Worker繼承了AQS抽象類,其重寫了AQS的一些方法,並且其也可作為一個Runnable物件,從而可以建立執行緒Thread。

  2. 類的屬性

複製程式碼
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        // 版本號
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        // worker 所對應的執行緒
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        // worker所對應的第一個任務
        Runnable firstTask;
        /** Per-thread task counter */
        // 已完成任務數量
        volatile long completedTasks;
    }
複製程式碼

  說明:Worker屬性中比較重要的屬性如下,Thread型別的thread屬性,用來封裝worker(因為worker為Runnable物件),表示一個執行緒;Runnable型別的firstTask,其表示該worker所包含的Runnable物件,即使用者自定義的Runnable物件,完成使用者自定義的邏輯的Runnable物件;volatile修飾的long型別的completedTasks,表示已完成的任務數量。

  3. 類的建構函式

複製程式碼
        Worker(Runnable firstTask) {
            // 設定狀態為-1
            setState(-1); // inhibit interrupts until runWorker
            // 初始化第一個任務
            this.firstTask = firstTask;
            // 根據當前worker,初始化執行緒
            this.thread = getThreadFactory().newThread(this);
        }
複製程式碼

  說明:用於構造一個worker物件,並設定AQS的state為-1,同時初始化了對應的域。

  4. 核心函式分析

複製程式碼
// 重寫了Runnable的run方法
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        // 是否被獨佔,0代表未被獨佔,1代表被獨佔
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        // 嘗試獲取
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) { // 比較並設定狀態成功
                // 設定獨佔執行緒
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 嘗試釋放
        protected boolean tryRelease(int unused) {
            // 設定獨佔執行緒為null
            setExclusiveOwnerThread(null);
            // 設定狀態為0
            setState(0);
            return true;
        }
        // 獲取鎖
        public void lock()        { acquire(1); }
        // 嘗試獲取鎖
        public boolean tryLock()  { return tryAcquire(1); }
        // 釋放鎖
        public void unlock()      { release(1); }
        // 是否被獨佔
        public boolean isLocked() { return isHeldExclusively(); }
        // 
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // AQS狀態大於等於0並且worker對應的執行緒不為null並且該執行緒沒有被中斷
                try {
                    // 中斷執行緒
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
複製程式碼

  說明:Worker的函式主要是重寫了AQS的相應函式和重寫了Runnable的run函式,重寫的函式比較簡單,具體的可以參見AQS的分析,這裡不再累贅。

  3.3 類的屬性  

複製程式碼
public class ThreadPoolExecutor extends AbstractExecutorService {
    // 執行緒池的控制狀態(用來表示執行緒池的執行狀態(整形的高3位)和執行的worker數量(低29位))
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29位的偏移量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大容量(2^29 - 1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 執行緒執行狀態,總共有5個狀態,需要3位來表示(所以偏移量的29 = 32 - 3)
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 阻塞佇列
    private final BlockingQueue<Runnable> workQueue;
    // 可重入鎖
    private final ReentrantLock mainLock = new ReentrantLock();
    // 存放工作執行緒集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // 終止條件
    private final Condition termination = mainLock.newCondition();
    // 最大執行緒池容量
    private int largestPoolSize;
    // 已完成任務數量
    private long completedTaskCount;
    // 執行緒工廠
    private volatile ThreadFactory threadFactory;
    // 拒絕執行處理器
    private volatile RejectedExecutionHandler handler;
    // 執行緒等待執行時間
    private volatile long keepAliveTime;
    // 是否執行核心執行緒超時
    private volatile boolean allowCoreThreadTimeOut;
    // 核心池的大小
    private volatile int corePoolSize;
    // 最大執行緒池大小
    private volatile int maximumPoolSize;
    // 預設拒絕執行處理器
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    //
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");
}    
複製程式碼

  說明:這裡著重講解一下AtomicInteger型別的ctl屬性,ctl為執行緒池的控制狀態,用來表示執行緒池的執行狀態(整形的高3位)和執行的worker數量(低29位)),其中,執行緒池的執行狀態有如下幾種

複製程式碼
    /**
    * RUNNING    :    接受新任務並且處理已經進入阻塞佇列的任務
    * SHUTDOWN    :    不接受新任務,但是處理已經進入阻塞佇列的任務
    * STOP        :    不接受新任務,不處理已經進入阻塞佇列的任務並且中斷正在執行的任務
    * TIDYING    :    所有的任務都已經終止,workerCount為0, 執行緒轉化為TIDYING狀態並且呼叫terminated鉤子函式
    * TERMINATED:    terminated鉤子函式已經執行完成
    **/
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
複製程式碼

  說明:由於有5種狀態,最少需要3位表示,所以採用的AtomicInteger的高3位來表示,低29位用來表示worker的數量,即最多表示2^29 - 1。

  3.4 類的建構函式

  1. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>)型建構函式

複製程式碼
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
複製程式碼

  說明:該建構函式用給定的初始引數和預設的執行緒工廠及被拒絕的執行處理程式建立新的 ThreadPoolExecutor。

   2. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory)型建構函式  

複製程式碼
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
複製程式碼

  說明:該建構函式用給定的初始引數和預設被拒絕的執行處理程式建立新的 ThreadPoolExecutor

   3. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, RejectedExecutionHandler)型建構函式

複製程式碼
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
複製程式碼

  說明:該建構函式用給定的初始引數和預設的執行緒工廠建立新的 ThreadPoolExecutor

   4. ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory, RejectedExecutionHandler)型建構函式

複製程式碼
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||                                                // 核心大小不能小於0
            maximumPoolSize <= 0 ||                                            // 執行緒池的初始最大容量不能小於0
            maximumPoolSize < corePoolSize ||                                // 初始最大容量不能小於核心大小
            keepAliveTime < 0)                                                // keepAliveTime不能小於0
            throw new IllegalArgumentException();                                
        if (workQueue == null || threadFactory == null || handler == null)
            
            
           

相關推薦

jdk執行框架ThreadPoolExcutor原始碼分析

一、前言   JUC這部分還有執行緒池這一塊沒有分析,需要抓緊時間分析,下面開始ThreadPoolExecutor,其是執行緒池的基礎,分析完了這個類會簡化之後的分析,執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務

jdk執行詳解

一、前言 在最近做的一個專案中,需要大量的使用到多執行緒和執行緒池,下面就java自帶的執行緒池和大家一起分享。 二、簡介 多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力,但頻繁的建立執行緒的開銷是很大的,

python執行

1.  注意:  導包是:     from multiprocessing.pool import ThreadPool  #執行緒池不在thrading中 2.  程式碼:   from mutiprocessing.pool import ThreadPool   def func(*args,

使用java執行

  java提供自帶的執行緒池,而不需要自己去開發一個自定義執行緒池了。 執行緒池類 ThreadPoolExecutor在包java.util.concurrent下 &nb

java concurrent包執行和佇列詳細講解

Java執行緒池使用說明一簡介執行緒的使用在java中佔有極其重要的地位,在jdk1.4極其之前的jdk版本中,關於執行緒池的使用是極其簡陋的。在jdk1.5之後這一情況有了很大的改觀。Jdk1.5之後加入了java.util.concurrent包,這個包中主要介紹java

java執行和佇列詳細講解

Java執行緒池使用說明 一簡介 執行緒的使用在java中佔有極其重要的地位,在jdk1.4極其之前的jdk版本中,關於執行緒池的使用是極其簡陋的。在jdk1.5之後這一情況有了很大的改觀。Jdk1.5之後加入了java.util.concurrent包,這個包中主要介紹j

Java執行技術之二 Java執行實現

一,介紹   類檢視如下:     自Java 1.5後,Java對執行緒相關的庫做了很大的拓展,執行緒池就是其中之一。Java執行緒的新特性多數在java.util.concurrent,其包含眾多的介面和類。其中java.util.concurrent.Execut

java.jse-執行-執行-java執行和佇列詳細講解

Java執行緒池使用說明 一簡介 執行緒的使用在java中佔有極其重要的地位,在jdk1.4極其之前的jdk版本中,關於執行緒池的使用是極其簡陋的。在jdk1.5之後這一情況有了很大的改觀。Jdk1.5之後加入了java.util.concurrent包,這個包中主要介紹

Java執行之ThreadPoolExecutor原始碼分析

一、引言 Java併發工具包自帶了很多常用的執行緒池,程式可以將定義的Runnable、Callable任務提交到執行緒池當中執行,由執行緒池負責非同步執行其中的任務。 Java執行緒池框架結構圖: 其中,Executors是一個執行緒池靜態工廠類,可以呼叫其

JUC(4)---java執行原理及原始碼分析

執行緒池,既然是個池子裡面肯定就裝很多執行緒。 如果併發的請求數量非常多,但每個執行緒執行的時間很短,這樣就會頻繁的建立和銷燬 執行緒,如此一來會大大降低系統的效率。可能出現伺服器在為每個請求建立新執行緒和銷燬線 程上花費的時間和消耗的系統資源要比處理實際的使用者請求的時間和資源更多。因此Java中提供執行緒

2.3四種執行連線的配置和使用(和定義執行

四種執行緒連線池的配置和使用 最終呼叫類和方法 {引數有 核心執行緒數目,最大執行緒數目,存活時間(當前執行緒執行完這個任務之後,等待下一個任務到來的最長等待時間。如果在這個時間內沒有新的任務來到,那當前執行緒就會退出),時間單位,等待佇列(用於存放待執行的任務)} public

Python 定義執行

"""思路1,將任務放在佇列 1)建立佇列:(初始化) 2)設定大小,執行緒池的最大容量 3)真實建立的執行緒 列表 4)空閒的執行緒數量2,著手開始處理任務 1)建立執行緒 2)空閒執行緒數量大於0,則不再建立執行緒 3)建立執行緒池的數量 不能高於

定義執行、內建執行.md

自定義簡單執行緒池 python執行緒是可以重複利用的,如果呼叫的時候每次都建立一個執行緒,則太浪費資源了 我把多執行緒比作服務員,每次有客人來的時候,都分配一個專門的服務員去服務; 當客人走了之後,服務員回到空閒狀態,繼續等待新的客人 import threa

Spring Boot 基礎系列教程 | 第三十二篇:使用@Async實現非同步呼叫:定義執行

推薦 Spring Boot/Cloud 視訊: 在之前的Spring Boot基礎教程系列中,已經通過《Spring Boot中使用@Async實現非同步呼叫》一文介紹過如何使用@Async註解來實現非同步呼叫了。但是,對於這些非同步執行的控制是我們保障自身

定義執行Executors

在使用有界佇列時,若有新的任務需要執行,如果執行緒池實際執行緒數小於corePoolSize,則優先建立執行緒,若大於corePoolSize,則會將任務加入佇列,若佇列已滿,則在匯流排程數不大於maximumPoolSize的前提下,建立新的執行緒,若執行緒數

執行框架executor

Eexecutor作為靈活且強大的非同步執行框架,其支援多種不同型別的任務執行策略,提供了一種標準的方法將任務的提交過程和執行過程解耦開發,基於生產者-消費者模式,其提交任務的執行緒相當於生產者,執行任務的執行緒相當於消費者,並用Runnable來表示任務,Executor的實現還提供了對生命週期的支援,以及

jdk單例執行和sping執行使用

 java提供的原生執行緒池技術處理原理很清晰,故只要使用自己的原生執行緒池技術一般都能滿足專案的需求。java提供了很好的執行緒池實現,比我們自己的實現要更加健壯以及高效,同時功能也更加強大,不建議自己編寫。另外有同學可能用過spring的執行緒池,那麼spring執行緒

jdk之多執行Future框架解析

       其中有錯誤和不足之處請大家提出來,希望和大家一起討論學習和進步;首先來看官方對Future的描述:Future 表示非同步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。 接下來看下Future框架類結構圖(沒有包含全部,當前先

八、JAVA多執行執行原理以及定義執行 ThreadPool

為什麼會需要執行緒池技術? (1)Thread是一個重量級的資源,它的建立,啟動以及銷燬都是比較耗費效能的;重複利用執行緒,減少執行緒建立,銷燬的開銷,是一種好的程式設計習慣。 (2)通過new Thread的方法建立執行緒難以管理,並且難以控制數量,執行緒的數量通常和系統的效能呈拋

java定義執行--ThreadPoolExecutors

public class MyThreadPool { public static void main(String[] args) { /** * 1.在使用有界佇列的時候:若有新的任務需要執行,如果執行緒池實際執行緒數小於corePoolSize核心執行緒數的時候,則優先建立執行緒。