1. 程式人生 > >詳解 Java 執行緒池

詳解 Java 執行緒池

網上講解執行緒池的文章一搜一大把,講解的還不錯的文章仔細找找也是有的。

為了讓您的這兩塊錢花的物有所值,本文一定讓您對執行緒池有一個全新的瞭解,並讓您在今後的工作中,能夠熟練使用執行緒池。

首先說說為什麼用執行緒池?

執行緒的建立 / 銷燬伴隨著系統開銷,過於頻繁的建立 / 銷燬執行緒,會很大程度上影響處理效率。

例如:記建立執行緒消耗時間 T1,執行任務消耗時間 T2,銷燬執行緒消耗時間 T3

如果 T1 + T3 > T2,那麼是不是說開啟一個執行緒來執行這個任務太不划算了! 正好,執行緒池快取執行緒,可用已有的閒置執行緒來執行新任務,避免了 T1 + T3 帶來的系統開銷。

執行緒池種類

再說執行緒池種類之前,先說一下初始化執行緒池的幾個引數,如果這個引數弄明白了,對於執行緒池你就基本上就可以瞭解了。

先來看一下執行緒池的幾個建構函式。

 //五個引數的建構函式

    public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)
//六個引數的建構函式 -1

    public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六個引數的建構函式 -2

    public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)
//七個引數的建構函式

    public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

int corePoolSize => 該執行緒池中核心執行緒數最大值

1. 核心執行緒

執行緒池新建執行緒的時候,如果當前執行緒總數小於 corePoolSize,則新建的是核心執行緒,如果超過 corePoolSize,則新建的是非核心執行緒

核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒啥也不幹 (閒置狀態)。

如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 這個屬性為 true,那麼核心執行緒如果不幹活(閒置狀態)的話,超過一定時間(時長下面引數決定),就會被銷燬掉

很好理解吧,正常情況下你不幹活我也養你,因為我總有用到你的時候,但有時候特殊情況 (比如我自己都養不起了),那你不幹活我就要把你幹掉了

2. 執行緒總數

int maximumPoolSize=> 該執行緒池中執行緒總數最大值執行緒總數 = 核心執行緒數 + 非核心執行緒數。

核心執行緒在上面解釋過了,這裡說下非核心執行緒:不是核心執行緒的執行緒(別激動,把刀放下...),其實在上面解釋過了。

3. 超時時間

long keepAliveTime => 該執行緒池中非核心執行緒閒置超時時長 一個非核心執行緒,如果不幹活(閒置狀態)的時長超過這個引數所設定的時長,就會被銷燬掉。 如果設定 allowCoreThreadTimeOut = true,則會作用於核心執行緒

4. 時間單位

TimeUnit unit keepAliveTime 的單位,TimeUnit 是一個 列舉型別,其包括:

NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小時
DAYS : 天

5. 佇列

BlockingQueue<Runnable> workQueue 該執行緒池中的任務佇列:維護著等待執行的 Runnable 物件

當所有的核心執行緒都在幹活時,新新增的任務會被新增到這個佇列中等待處理,如果佇列滿了,則新建非核心執行緒執行任務

6. 常用的 workQueue 型別

  • SynchronousQueue:這個佇列接收到任務的時候,會直接提交給執行緒處理,而不保留它,如果所有執行緒都在工作怎麼辦?那就新建一個執行緒來處理這個任務!所以為了保證不出現<執行緒數達到了maximumPoolSize而不能新建執行緒>的錯誤,使用這個型別佇列的時候,maximumPoolSize 一般指定成 Integer.MAX_VALUE,即無限大

  • LinkedBlockingQueue:這個佇列接收到任務的時候,如果當前執行緒數小於核心執行緒數,則新建執行緒(核心執行緒)處理任務;如果當前執行緒數等於核心執行緒數,則進入佇列等待。由於這個佇列沒有最大值限制,即所有超過核心執行緒數的任務都將被新增到佇列中,這也就導致了 maximumPoolSize 的設定失效,因為匯流排程數永遠不會超過 corePoolSize

  • ArrayBlockingQueue:可以限定佇列的長度,接收到任務的時候,如果沒有達到 corePoolSize 的值,則新建執行緒(核心執行緒)執行任務,如果達到了,則入隊等候,如果佇列已滿,則新建執行緒 (非核心執行緒) 執行任務,又如果匯流排程數到了 maximumPoolSize,並且佇列也滿了,則發生錯誤

  • DelayQueue:佇列內元素必須實現 Delayed 介面,這就意味著你傳進去的任務必須先實現 Delayed 介面。這個佇列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務

7. ThreadFactory

建立執行緒的方式,這是一個介面,你 new 他的時候需要實現他的Thread newThread(Runnable r)方法,一般用不上。

小夥伴應該知道 AsyncTask 是對執行緒池的封裝吧?那就直接放一個 AsyncTask 新建執行緒池的 threadFactory 引數原始碼吧:

    new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread new Thread(Runnable r) {
               return new Thread(r,"AsyncTask#"+mCount.getAndIncrement());
        }
    }

這麼簡單?就給執行緒起了個名!

8. RejectedExecutionHandler

這玩意兒就是丟擲異常專用的,比如上面提到的兩個錯誤發生了,就會由這個handler丟擲異常,你不指定他也有個預設的

ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常 ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)。 ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

9. ThreadPoolExecutor 的策略

上面介紹引數的時候其實已經說到了 ThreadPoolExecutor 執行的策略,這裡給總結一下,當一個任務被新增進執行緒池時:

1.執行緒數量未達到corePoolSize,則新建一個執行緒(核心執行緒)執行任務 2.執行緒數量達到了 corePools,則將任務移入佇列等待 3.佇列已滿,新建執行緒(非核心執行緒)執行任務 4.佇列已滿,匯流排程數又達到了 maximumPoolSize,就會由上面那位星期天 (RejectedExecutionHandler) 丟擲異常

常見四種執行緒池

如果你不想自己寫一個執行緒池,那麼你可以從下面看看有沒有符合你要求的 (一般都夠用了),如果有,那麼很好你直接用就行了,如果沒有,那你就老老實實自己去寫一個吧。

Java 通過 Executors 提供了四種執行緒池,這四種執行緒池都是直接或間接配置 ThreadPoolExecutor 的引數實現的,下面我都會貼出這四種執行緒池建構函式的原始碼,各位大佬們一看便知!

CachedThreadPool

優勢:

1.執行緒數無限制
2.有空閒執行緒則複用空閒執行緒,若無空閒執行緒則新建執行緒
3.一定程式減少頻繁建立/銷燬執行緒,減少系統開銷

建立方法:

    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

    }

FixedThreadPool

優勢:

1.可控制執行緒最大併發數(同時執行的執行緒數)
2.超出的執行緒會在佇列中等待

    建立方法:
    //nThreads => 最大執行緒數即maximumPoolSize
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
    //threadFactory => 建立執行緒的方法,這就是我叫你別理他的那個星期六!你還看!
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
    原始碼:
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
    }

2個引數的構造方法原始碼,不用我貼你也知道他把星期六放在了哪個位置!所以我就不貼了,省下篇幅給我扯皮

ScheduledThreadPool

支援定時及週期性任務執行。

    //建立方法:
    //nThreads => 最大執行緒數即maximumPoolSize
    ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
    //原始碼:
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    //ScheduledThreadPoolExecutor():public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

SingleThreadExecutor

優勢:

1.有且僅有一個工作執行緒執行任務
2.所有任務按照指定順序執行,即遵循佇列的入隊出隊規則

    //建立方法:
    ExecutorService singleThreadPool = Executors.newSingleThreadPool();
    //原始碼:
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
   // 還有一個Executors.newSingleThreadScheduledExecutor()結合了 3 和 4,就不介紹了,基本不用

準確的指定引數大小

系統負載

引數的設定跟系統的負載有直接的關係,下面為系統負載的相關引數:

  • tasks:每秒需要處理的最大任務數量
  • tasktime:處理第個任務所需要的時間
  • responsetime:系統允許任務最大的響應時間,比如每個任務的響應時間不得超過2秒。

引數設定

corePoolSize:

每個任務需要 tasktime 秒處理,則每個執行緒每鈔可處理 1 / tasktime 個任務。

系統每秒有 tasks 個任務需要處理,則需要的執行緒數為:tasks / ( 1 / tasktime ),即 tasks * tasktime 個執行緒數。

假設系統每秒任務數為 100 ~ 1000,每個任務耗時 0.1 秒,則需要 100 * 0.1 至 1000 * 0.1,即 10 ~ 100 個執行緒。

那麼 corePoolSize 應該設定為大於 10,具體數字最好根據 8020 原則,即 80 % 情況下系統每秒任務數,若系統80%的情況下第秒任務數小於 200,最多時為 1000,則 corePoolSize 可設定為 20。

queueCapacity:

任務佇列的長度要根據核心執行緒數,以及系統對任務響應時間的要求有關。 佇列長度可以設定為 ( corePoolSize / tasktime ) * responsetime : ( 20 / 0.1 ) * 2 = 400,即佇列長度可設定為 400。

佇列長度設定過大,會導致任務響應時間過長,切忌以下寫法:

 LinkedBlockingQueue queue = new LinkedBlockingQueue();

這實際上是將佇列長度設定為Integer.MAX_VALUE,將會導致執行緒數量永遠為corePoolSize,再也不會增加,當任務數量陡增時,任務響應時間也將隨之陡增。

maxPoolSize:

當系統負載達到最大值時,核心執行緒數已無法按時處理完所有任務,這時就需要增加執行緒。

每秒 200 個任務需要 20 個執行緒,那麼當每秒達到 1000 個任務時,則需要 ( 1000 - queueCapacity ) * ( 20 / 200 ),即 60 個執行緒,可將 maxPoolSize 設定為 60。

keepAliveTime:

執行緒數量只增加不減少也不行。當負載降低時,可減少執行緒數量,如果一個執行緒空閒時間達到 keepAliveTiime,該執行緒就退出。 預設情況下執行緒池最少會保持 corePoolSize 個執行緒。

allowCoreThreadTimeout:

預設情況下核心執行緒不會退出,可通過將該引數設定為 true,讓核心執行緒也退出。

須知:

以上關於執行緒數量的計算並沒有考慮 CPU 的情況。

若結合 CPU 的情況,比如,當執行緒數量達到 50 時,CPU 達到 100%,則將 maxPoolSize 設定為 60 也不合適,此時若系統負載長時間維持在每秒 1000個 任務,則超出執行緒池處理能力,應設法降低每個任務的處理時間 ( tasktime )。

在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,(除非呼叫了 prestartAllCoreThreads () 或者 prestartCoreThread () 方法,從這 2 個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立 corePoolSize 個執行緒或者一個執行緒)。

執行緒池監控

線上程池中,執行緒池的監控也是很重要的一個點。

我們經常會注意的點有當前的排隊執行緒池數,當前的活動執行緒數,執行完成執行緒數,匯流排程數。

private static ExecutorService executor = new ThreadPoolExecutor(50, 100, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(100000));

public static void main(String[] args) throws Exception {
    for (int i = 0; i < 100000; i++) {
        executor.execute(() -> {
            System.out.print(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    ThreadPoolExecutor tpe = ((ThreadPoolExecutor) executor);

    while (true) {
        System.out.println();

        int queueSize = tpe.getQueue().size();
        System.out.println("當前排隊執行緒數:" + queueSize);

        int activeCount = tpe.getActiveCount();
        System.out.println("當前活動執行緒數:" + activeCount);

        long completedTaskCount = tpe.getCompletedTaskCount();
        System.out.println("執行完成執行緒數:" + completedTaskCount);

        long taskCount = tpe.getTaskCount();
        System.out.println("匯流排程數:" + taskCount);

        Thread.sleep(3000);
    }

}