1. 程式人生 > >併發程式設計——ThreadPoolExecutor原始碼分析(二)

併發程式設計——ThreadPoolExecutor原始碼分析(二)

前言

在上一篇中,我們分析了ThreadPoolExecutor中關鍵變數ctl,這篇我們繼續來看ThreadPoolExecutor中的建構函式及其引數。其中引數的相關解釋來源於原始碼中的相關注釋。

建構函式

我們可以看到ThreadPoolExecutor有四個建構函式:

他們其實都是呼叫其中的全引數的建構函式,只不過有一些引數是使用了預設提供的引數。我們可以看一下建構函式:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

可以看到建構函式現對於引數去進行了校驗:
(1)corePoolSize必須大於等於0,maximumPoolSize必須大於0,maximumPoolSize必須大於等於corePoolSize,keepAliveTime如果傳入了必須大於0。

(2)workQueue、threadFactory、handler不能為空

(3)其中有acc欄位的設定是為了設定安全管理器,我們可以自定義我們的安全管理器,否則為從上下文中去拿。可以參考部落格:jvm中的安全管理器

建構函式中的引數

1.corePoolSize

corePoolSize引數表示執行緒池中一直存活的執行緒的最小數量,這些一直存活的執行緒被稱為核心執行緒,預設情況下,核心執行緒的最小數量都是整數,除非是呼叫了allowCoreThreadTimeout()方法並且傳入了true,即允許核心執行緒數在空閒狀態下超時而停止(terminated狀態),此時如果所有的核心執行緒先後都因為超時停止,那麼執行緒池中核心執行緒數會變為0。預設情況下,核心執行緒是按照需要建立並啟動的,也就是說只有當執行緒池接收到我們提交的任務後,它才會去建立並啟動一定的核心執行緒去執行這些任務。如果沒有接收到相關任務,就不會去主動建立核心執行緒,這種預設的核心執行緒建立啟動方式變主動為被動,類似於觀察者模式,有利於降低系統資源的消耗。當然,也可以通過設定preStartCoreThread()或者preStartAllCoreThreads()方法來改變這一機制,使得在新任務還未提交到執行緒池的時候,執行緒池就已經建立並啟動了一個或所有執行緒,並讓這些核心執行緒在池中等待任務的到來。

2.maximumPoolSize

maximumPoolSize表示執行緒池中能容量執行緒的最大數量,這個值不能超過常量CAPACITY的數值大小,上一篇中也提到了常量CAPACITY的計算方式,這裡不去贅述。但是注意一點,當我們提供的工作佇列是一個無界的佇列,那麼這裡提供的maximumPoolSize將毫無意義。

當我們通過execute方法提交一個任務的時候:

(1)如果執行緒池處於執行狀態(RUNNING)的執行緒數量小於核心執行緒數(corePoolSize),那麼即使有一些非核心執行緒處於空閒狀態,系統也傾向於新建一個執行緒來處理這個任務。

(2)如果執行緒池處於執行狀態(RUNNING)的執行緒數量大於核心執行緒數(corePoolSize),但又小於maximumPoolSize,那麼系統會去判斷執行緒池內部的阻塞佇列是否有空位子,如果有空位子,系統會將該任務先存入阻塞佇列,如果發現佇列中已沒有空位子(即佇列已滿),系統會建立一個新的執行緒來執行任務。

如果將執行緒池中的corePoolSize和maximumPoolSize設定為相同的數(也就是說執行緒池中所有執行緒都是核心執行緒),那麼該執行緒池就是一個固定容量的池子。如果將執行緒池的maximumPoolSize設定為一個非常大的數值(例如Integer.MAX_VALUE),那麼相當於允許執行緒池自己在不同時段調整參與併發的總任務數。通常情況下,都是通過建構函式去初始化corePoolSize和maximumPoolSize,也可以通過set方法調整這兩個引數的大小。

3.keepAliveTime & unit

keepAliveTime表示空閒執行緒處於等待的超時時間,超過該時間後該執行緒會停止工作。當執行緒池中匯流排程數量大於corePoolSize並且allowCoreThreadTimeOut為false時,這些多出來的非核心執行緒一旦進入空閒等待的狀態,就開始計算各自的等待時間,並且這裡設定的keepAliveTime的數值作為他們的超時時間,一旦某個非核心執行緒的等待時間到達了超時時間,該執行緒就會停止工作(terminated)。而如果不去設定allowCoreThreadTimeout為true,核心執行緒及時處於空閒狀態等待了keepAliveTime,也依然可以繼續處於空閒狀態等待。

比較好的應用實踐:

如果要執行的任務相對較多,並且每個任務執行的時間都比較短,那麼可以為keepAliveTime引數設定一個相對較大的值,以提高執行緒的利用率;如果要執行的任務比較少,執行緒池使用率比較低,那麼可以先將該引數設定為一個較小的引數值,通過超時停機的機制來降低系統資源的開銷。

注意一點:建構函式中的引數keepAliveTime和unit這個引數和ThreadPoolExecutor中的keepAliveTime欄位的值不一定相等,欄位被設定為long型的值,且定義為納秒的單位,建構函式中的引數還有unit單位,應該是keepAliveTime和unit計算的結果換算為納秒才和類中的欄位是一樣的值。

keepAliveTime在建構函式中的型別是long型,這樣保證了這個值不會太短。

4.workQueue

建構函式中的workQueue是一個BlockIngQueue(阻塞佇列)的例項。傳入的泛型引數是Runnable,也就是說,workQueue是一個內部元素為Runnable(各種任務,通常是非同步的任務)的阻塞佇列。阻塞佇列是一種類似於”生產者-消費者“模式的佇列,當佇列已滿時如果繼續向佇列中插入元素,該插入操作將被阻塞一直處於等待狀態,直到佇列中有元素被移除,才能進行插入操作;當佇列為空時如果繼續執行元素的刪除或者獲取操作,也會被阻塞進入等待佇列中有新的元素之後才能執行。

workQueue是一個用於儲存等待執行的任務阻塞佇列,當提交一個新的任務到執行緒池後,執行緒池會根據當前池子正在執行的執行緒數量來判斷對這個任務的處理方式:

(1)如果執行緒池中正在執行的執行緒數少於核心執行緒數,那麼執行緒池總是傾向於新建一個執行緒來執行該任務。

(2)如果執行緒池中正在執行的執行緒數不少於核心執行緒數,那麼執行緒池把該任務提交到workQueue中讓其先等待

(3)如果執行緒池中正在執行的執行緒數不少於核心執行緒數,並且執行緒池中的阻塞佇列也滿了使得該任務入隊失敗,那麼執行緒池會去判斷當前池子中執行的執行緒數是否已經等於了該執行緒池允許執行的最大執行緒數。如果發現已經等於,說明池子已滿,那麼就會執行拒絕策略;如果發現執行的執行緒數小於池子允許的最大執行緒數,那麼會建立一個執行緒(這個執行緒是非核心執行緒)來執行該任務。

這其中,佇列對於提交的任務一般有三種策略:
(1)直接切換

常用的佇列是SynchronousQueue(同步佇列),這個佇列內部不會儲存元素,每一次插入操作都會先進入阻塞狀態,一直等到另一個執行緒執行了佇列的刪除操作,然後該插入操作才會執行。當提交一個任務到包含這種SynchronousQueue佇列的執行緒池後,執行緒池會去檢測是否有可用的執行緒來執行任務,如果沒有則建立一個新的執行緒來執行任務而不是將任務儲存在任務佇列中。”直接切換“的意思是:處理方式由”將該任務暫時儲存在阻塞佇列中“直接切換為”新建一個執行緒來處理任務“。這種執行策略適合處理多個有相互依賴關係的任務,因為該策略可以避免這些任務因一個沒有及時處理而導致依賴於該任務的其他任務也不能及時處理而造成的鎖定結果。因為這種策略的目的是要讓幾乎每一個新提交的任務都能立即得到處理,所以這種策略通常配合maximumPoolSize是無邊際(Integer.MAX_VALUE)的。我們知道的靜態工廠方法Executors.newCachedThreadPool()就是使用了這種直接切換的佇列。

(2)使用無界佇列

不預設佇列的容量,佇列將使用Integer.MAX_VALUE作為預設容量,例如:基於連結串列的阻塞佇列 LinkedBlockingQueue。使用無界佇列使得執行緒池中能建立的最大執行緒數等於核心執行緒數,這樣的執行緒池的maxmumPoolSize的數值將不起任何作用。如果向執行緒池中提交一個新任務時發現所有的核心執行緒都處於執行狀態,那麼該任務將被放入無界佇列中等待處理。當要處理的多個任務之間沒有相互依賴關係的時候,就適合用這種佇列策略來處理這些任務。靜態工廠方法Executors.newFixedThreadPool()就使用了這個佇列。

(3)使用有界佇列

例如使用基於陣列的阻塞佇列 ArrayBlockingQueue。當要求執行緒池的最大執行緒數maximumPoolSize要限定在某個值以內的時候,執行緒池使用有界佇列能降低資源的消耗,但這也使得執行緒池對執行緒的調控變得更加困難。因為佇列容量和執行緒池容量都是有限的值,要想使執行緒處理任務的吞吐量在一個相對合理的範圍內,同時又能使執行緒排程的難度相對較低,並且又儘可能節省系統資源的消耗,那麼需要合理的調配這兩個值。通常來說,設定較大的佇列容量和較小的執行緒池容量,能夠降低系統的資源的消耗(包括CPU的使用率,作業系統的消耗,上下文環境的切換的開銷等),但是會降低系統吞吐率。如果發現提交的任務經常頻繁的發生阻塞的情況,那麼你可以考慮增大執行緒池的容量,可以通過setMaximumPoolSize()方法來重新設定執行緒池的容量。而設定較小的佇列量時,通常需要將執行緒池的容量設定大一點,這種情況下,cpu的使用率會比較高,但是如果設定執行緒池的容量過大的時候,執行緒排程成了問題,反而使得吞吐率比較低。

5.threadFactory

執行緒工廠,用於建立執行緒。預設使用Executors.defaultThreadFactory()方法建立執行緒工廠:

當然我們也可以自己實現ThreadFactory介面去實現我們自己的執行緒工廠。下邊就是可以根據不同的namePrefix去獲取單例執行緒的執行緒工廠:

/**
 * 自定義執行緒工廠類
 */
private static class MyThreadFactory implements ThreadFactory {
    /**
     * namePrefix --> 執行緒名字中的計數
     */
    private static Map<String, AtomicInteger> THREAD_ID_TABLE = new ConcurrentHashMap<>();
    /**
     * 執行緒名稱字首
     */
    private String namePrefix;
    /**
     * 是否後臺執行緒
     */
    private boolean isDamon;
    public MyThreadFactory(String namePrefix) {
        this(namePrefix, true);
    }

    public MyThreadFactory(String namePrefix, boolean isDamon) {
        this.namePrefix = namePrefix;
        this.isDamon = isDamon;
    }


    @Override
    public Thread newThread(Runnable r) {
        String threadName = namePrefix + "-" + generateThreadId(this.namePrefix);
        Thread thread = new Thread(r, threadName);
        thread.setDaemon(this.isDamon);
        System.out.println("建立執行緒" + threadName);
        return thread;
    }

    private static int generateThreadId(String namePrefix) {

        // 判斷後執行 concurrentHashMap不能保證完全執行緒安全 用了putIfAbsent
        if (!THREAD_ID_TABLE.containsKey(namePrefix)) {
            THREAD_ID_TABLE.putIfAbsent(namePrefix, new AtomicInteger(0));
        }
        return THREAD_ID_TABLE.get(namePrefix).getAndIncrement();
    }
}

6.handler

當滿足以下兩個條件其中一個的時候,如果繼續向執行緒池中提交新的任務,那麼執行緒池會呼叫內部的RejectedExecutionHandler物件的rejectedExecution()方法,表示拒絕執行這些新提交的任務:

(1)當執行緒池處於SHUTDOWN狀態時(不論執行緒池和阻塞佇列是否已滿)

(2)當執行緒池中所有的執行緒都處於執行狀態並且執行緒池中的阻塞佇列已滿。

當採用預設的拒絕策略,執行緒池會使用丟擲異常的方式來拒絕新任務的提交,這種拒絕方式線上程池中被稱為AbortPolicy,我們可以來看下有哪些拒絕策略:
(1)AbortPolicy

這中處理方式是直接丟擲RejectedExecutionException異常,如果在ThreadPoolExecutor的建構函式中未指定RejectedExecutionHandler引數,那麼執行緒池將使用defaultHandler引數,而這個就是採用的AbortPolicy。

(2)CallerRunsPolicy

將提交的任務放在ThreadPoolExecutor.execute()方法所在的那個執行緒執行。

(3)DiscardPolicy

直接不執行新提交的任務

(4)DiscardOldestPolicy

這個可以看原始碼中的解釋:

由原始碼就可以知道,這種處理方式有兩種情況:一,當執行緒池處於SHUTDOWN狀態時,就預設不執行這個任務,即DiscardPolicy;二,當執行緒池處於執行狀態時,會將佇列中處於隊首(head)的那個任務從佇列中移除,然後將這個新提交的任務加入到阻塞佇列中的隊尾(tail)等待執行。

當然,RejectedExecutionHandler其實是個介面,我們可以自定義類去實現這個介面,重寫rejectedExecution方法使用自己想要的拒絕策略即可。

下一篇

這篇把執行緒池中的核心引數進行了一些解釋,在下一篇中我們將介紹執行緒池進行任務排程的原理。

相關推薦

併發程式設計——ThreadPoolExecutor原始碼分析

前言 在上一篇中,我們分析了ThreadPoolExecutor中關鍵變數ctl,這篇我們繼續來看ThreadPoolExecutor中的建構函式及其引數。其中引數的相關解釋來源於原始碼中的相關注釋。 建構函式 我們可以看到ThreadPoolExecutor

併發程式設計——ThreadPoolExecutor原始碼分析

前言 執行緒池是併發程式設計中最重要的應用之一,使用執行緒池可以防止大量的建立和銷燬執行緒的過程,可以節省很多的記憶體空間,提高程式的響應率和cpu的利用率,並且也可以對執行緒進行統一管理和監控。這裡將分幾篇文章介紹一下執行緒池的原始碼分析。本篇是分析Threa

Java併發之AQS原始碼分析

我在Java併發之AQS原始碼分析(一)這篇文章中,從原始碼的角度深度剖析了 AQS 獨佔鎖模式下的獲取鎖與釋放鎖的邏輯,如果你把

Flume NG原始碼分析支援執行時動態修改配置的配置模組

在上一篇中講了Flume NG配置模組基本的介面的類,PropertiesConfigurationProvider提供了基於properties配置檔案的靜態配置的能力,這篇細說一下PollingPropertiesFileConfigurationProvider提供的執行時動態修改配置並生效的

GCC原始碼分析——前端

原文連結:http://blog.csdn.net/sonicling/article/details/6706152   從這一篇開始,我們將從原始碼的角度來分析GCC如何完成對C語言原始檔的處理。GCC的內部構架在GCC Internals(搜“gccint.pdf”,或者見[

Glide原始碼分析——從用法來看之load&into方法

上一篇,我們分析了with方法,文章連結: https://blog.csdn.net/qq_36391075/article/details/82833260 在with方法中,進行了Glide的初始化,建立了RequesManger,並且綁定了生命週期,最終返回了一個Reques

YOLOv2原始碼分析

文章全部YOLOv2原始碼分析 接著上一講沒有講完的make_convolutional_layer函式 0x01 make_convolutional_layer //make_convolutional_laye

zigbee 之ZStack-2.5.1a原始碼分析 無線接收控制LED

本文描述ZStack-2.5.1a 模板及無線接收移植相關內容。 main HAL_BOARD_INIT // HAL_TURN_OFF_LED1 InitBoard HalDriverInit HalAdcInit

兄弟連區塊鏈入門教程eth原始碼分析p2p-udp.go原始碼分析

ping方法與pending的處理,之前談到了pending是等待一個reply。 這裡通過程式碼來分析是如何實現等待reply的。pending方法把pending結構體傳送給addpending. 然後等待訊息的處理和接收。 // ping sends a ping message to the giv

Java高併發程式設計之synchronized關鍵字

上一篇文章講了synchronized的部分關鍵要點,詳見:Java高併發程式設計之synchronized關鍵字(一) 本篇文章接著講synchronized的其他關鍵點。 在使用synchronized關鍵字的時候,不要以字串常量作為鎖定物件。看下面的例子: public class

Spring原始碼分析IoC容器的實現1

    Ioc(Inversion of Control)——“控制反轉”,不是什麼技術,而是一種設計思想。在Java開發中,Ioc意味著將你設計好的物件交給容器控制,而不是傳統的在你的物件內部直接控制。理解好Ioc的關鍵是要明確“誰控制誰,控制什麼,為何是反轉(有

tornado原始碼分析之iostream

在事件驅動模型中,所有任務都是以某個事件的回撥函式的方式新增至事件迴圈中的,如:HTTPServer要從socket中讀取客戶端傳送的request訊息,就必須將該socket新增至ioloop中,並設定回掉函式,在回掉函式中從socket中讀取資料,並且檢查request訊息是否全部接收到了,如果

Cat原始碼分析:Server端

初始化 服務端消費客戶端發來的訊息進行分析和展示,所以這個的初始化指的是CatHomeModule的初始化 CatHomeModule依賴TcpSocketReceiver和MessageConsumer,前者用來接收客戶端傳送的訊息,後者用來消費訊息。 TcpSocket

ThreadPoolExecutor原始碼解析

1.ThreadPoolExcuter執行例項 首先我們先看如何新建一個ThreadPoolExecutor去執行執行緒。然後深入到原始碼中去看ThreadPoolExecutor裡面使如何運作的。 public class Test { public

subsampling-scale-image-view載入長圖原始碼分析

subsampling-scale-image-view原始碼分析概要分析總結 概要 subsampling-scale-image-view是一個支援部分載入大圖長圖的圖片庫,並且還支援縮放,在subsampling-scale-image-view載入長圖原

Spring component-scan原始碼分析 -- @Configuration註解處理

上篇文章Spring component-scan原始碼分析(一) – XML解析分析了Spring解析<context:component-scan …/>標籤時,把掃描到的合適的類封裝成BeanDefinition加入Sping容器中,本篇分析S

Spring原始碼分析IoC容器的實現3

BeanDefinition的載入和解析     這個載入過程,相當於把定義的BeanDefinition在IoC容器中轉化成一個Spring內部表示的資料結構的過程。IoC容器對Bean的管理和依賴注入功能的實現,是通過對其持有的BeanDefinition進

Spring原始碼分析IoC容器的實現2

IoC容器的初始化過程     簡單來說IoC容器的初始化是由refresh()方法啟動的,這個方法標誌著IoC容器的正式啟動。這個啟動包括BeanDefinition的Resouce定位、載入和註冊三個基本過程。     第一

groupcache 原始碼分析-- LRU

lru部分的程式碼在lru/lru.go檔案中,它主要是封裝了一系列lru演算法相關的介面,供groupcahe進行快取置換相關的呼叫。 它主要封裝了下面幾個介面: // 建立一個Cache func New(maxEntries int) *Cache /

Spark2.3.2原始碼解析: 7. SparkContext原始碼分析 :TaskScheduler

    程式碼部分:   啟動指令碼 --name spark-test --class WordCount --master yarn --deploy-mode cluster /A/spark-test.jar /