1. 程式人生 > >Java學習筆記 執行緒池使用及詳解

Java學習筆記 執行緒池使用及詳解

有點笨,參考了好幾篇大佬們寫的文章才整理出來的筆記....

字面意思上解釋,執行緒池就是裝有執行緒的池,我們可以把要執行的多執行緒交給執行緒池來處理,和連線池的概念一樣,通過維護一定數量的執行緒池來達到多個執行緒的複用。

好處

多執行緒產生的問題

一般我們使用到多執行緒的程式設計的時候,需要通過new Thread(xxRunnable).start()建立並開啟執行緒,我們可以使用多執行緒來達到最優效率(如多執行緒下載)。

但是,執行緒不是越多就越好,執行緒過多,建立和銷燬就會消耗系統的資源,也不方便管理。

除此之外,多執行緒還會造成併發問題,執行緒併發數量過多,搶佔系統資源從而導致阻塞。

執行緒池優點

我們將執行緒放入執行緒池,由執行緒池對執行緒進行管理,可以對執行緒池中緩衝的執行緒進行復用,這樣,就不會經常去建立和銷燬執行緒了,從而省下了系統的資源。

執行緒池能夠有效的控制執行緒併發的數量,能夠解決多執行緒造成的併發問題。

除此之外,執行緒池還能夠對執行緒進行一定的管理,如延時執行、定時迴圈執行的策略等

執行緒池實現

執行緒池的實現,主要是通過這個類ThreadPoolExecutor,其的構造引數非常長,我們先大概瞭解,之後再進行詳細的介紹。

public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,long keepAliveTime,
    TimeUnit unit,BlockingQueue workQueue,
    RejectedExecutionHandler handler)
  • corePoolSize:執行緒池核心執行緒數量
  • maximumPoolSize:執行緒池最大執行緒數量
  • keepAliverTime:當活躍執行緒數大於核心執行緒數時,空閒的多餘執行緒最大存活時間
  • unit:存活時間的單位
  • workQueue:存放執行緒的工作佇列
  • handler:超出執行緒範圍和佇列容量的任務的處理程式(拒絕策略)

這裡大概簡單說明一下執行緒池的執行流程:

當執行緒被新增到執行緒池中,如果執行緒池中的當前的執行緒數量等於執行緒池定義的最大核心執行緒數量(corePoolSize)了,此執行緒就會別放入執行緒的工作佇列(workQueue)中,等待執行緒池的呼叫。

Java提供了一個工具類Excutors

,方便我們快速建立執行緒池,其底層也是呼叫了ThreadPoolExecutor

不過阿里巴巴Java規範中強制要求我們應該通過ThreadPoolExecutor來建立自己的執行緒池,使用Excutors容易造成OOM問題。

所以,我們先從Excutors開始學習,之後在對ThreadPoolExecutor進行詳細的講解

Excutors

由於Excutors是工具類,所以下面的介紹的都是其的靜態方法,如果是比較執行緒數目比較少的小專案,可以使用此工具類來建立執行緒池

PS:把執行緒提交給執行緒池中,有兩種方法,一種是submit,另外一種則是execute

兩者的區別:

  1. execute沒有返回值,如果不需要知道執行緒的結果就使用execute方法,效能會好很多。
  2. submit返回一個Future物件,如果想知道執行緒結果就使用submit提交,而且它能在主執行緒中通過Future的get方法捕獲執行緒中的異常

執行緒池可以接收兩種的引數,一個為Runnable物件,另外則是Callable物件

Callable是JDK1.5時加入的介面,作為Runnable的一種補充,允許有返回值,允許丟擲異常。

主要的幾個靜態方法:

方法 說明
newFixedThreadPool(int nThreads) 建立固定大小的執行緒池
newSingleThreadExecutor() 建立只有一個執行緒的執行緒池
newCachedThreadPool() 建立一個不限執行緒數上限的執行緒池,任何提交的任務都將立即執行
newScheduledThreadPool(int nThreads) 建立一個支援定時、週期性或延時任務的限定執行緒數目的執行緒池
newSingleThreadScheduledExecutor() 建立一個支援定時、週期性或延時任務的單個執行緒的執行緒池

1.newSingleThreadExecutor

建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行,我們可以使用它來達到控制執行緒順序執行。

控制程序順序執行:

Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("這是執行緒1");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
Thread thread2 = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("這是執行緒2");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
Thread thread3 = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("這是執行緒3");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
//建立執行緒池物件
ExecutorService executorService = Executors.newSingleThreadExecutor();
//把執行緒新增到執行緒池中
executorService.submit(thread1);
executorService.submit(thread2);
executorService.submit(thread3);

之後出現的結果就是按照順序輸出

2.newFixedThreadPool

建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。定長執行緒池的大小最好根據系統資源進行設定。如Runtime.getRuntime().availableProcessors()

3.newCachedThreadPool

建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒,執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。

程式碼:

//建立了一個自定義的執行緒
public class MyThread extends Thread {
    private int index;

    public MyThread(int index) {
        this.index = index;
    }

    @Override
    public void run() {
        System.out.println(index+" 當前執行緒"+Thread.currentThread().getName());
    }
}

//建立快取執行緒池
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    executorService.execute(new MyThread(i));
    try {
        //這裡模擬等待時間,等待執行緒池複用回收執行緒
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

可以看到結果都是使用的同一個執行緒

4.newScheduledThreadPool

建立一個定長執行緒池,支援定時、週期性或延時任務執行

延遲1s後啟動執行緒:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
scheduledExecutorService.schedule(new MyThread(1),1, TimeUnit.SECONDS);

ThreadPoolExecutor

構造方法

上面提到的那個構造方法其實只是ThreadPoolExecutor類中的一個,ThreadPoolExecutor類中存在有四種不同的構造方法,主要區別就是引數不同。

//五個引數的建構函式
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六個引數的建構函式-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六個引數的建構函式-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七個引數的建構函式
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

首先,有個概念需要明白,執行緒池的最大執行緒數(執行緒總數,maximumPoolSize)= 核心執行緒數(corePoolSize)+非核心執行緒數

  • corePoolSize:執行緒池核心執行緒數量
  • maximumPoolSize:執行緒池最大執行緒數量
  • keepAliverTime:當活躍執行緒數大於核心執行緒數時,空閒的多餘執行緒最大存活時間
  • unit:存活時間的單位
  • workQueue:存放執行緒的工作佇列
  • handler:超出執行緒範圍和佇列容量的任務的處理程式(拒絕策略)

核心執行緒和非核心執行緒有什麼區別呢?

核心執行緒是永遠不會被執行緒池丟棄回收(即使核心執行緒沒有工作),非核心執行緒則是超過一定時間(keepAliverTime)則就會被丟棄

workQueue

當所有的核心執行緒都在工作時,新新增的任務會被新增到這個佇列中等待處理,如果佇列滿了,則新建非核心執行緒執行任務

1.SynchronousQueue:這個佇列接收到任務的時候,會直接提交給執行緒處理,而不保留它,如果所有執行緒都在工作怎麼辦?那就新建一個執行緒來處理這個任務!所以為了保證不出現執行緒數達到了maximumPoolSize而不能新建執行緒的錯誤,使用這個型別佇列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大

2.LinkedBlockingQueue:這個佇列接收到任務的時候,如果當前執行緒數小於核心執行緒數,則新建執行緒(核心執行緒)處理任務;如果當前執行緒數等於核心執行緒數,則進入佇列等待。由於這個佇列沒有最大值限制,即所有超過核心執行緒數的任務都將被新增到佇列中,這也就導致了maximumPoolSize的設定失效,因為匯流排程數永遠不會超過corePoolSize

3.ArrayBlockingQueue:可以限定佇列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建執行緒(核心執行緒)執行任務,如果達到了,則入隊等候,如果佇列已滿,則新建執行緒(非核心執行緒)執行任務,又如果匯流排程數到了maximumPoolSize,並且佇列也滿了,則發生錯誤

4.DelayQueue:佇列內元素必須實現Delayed介面,這就意味著你傳進去的任務必須先實現Delayed介面。這個佇列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務

拒絕策略:

拒絕策略 拒絕行為
AbortPolicy 丟擲RejectedExecutionException異常(預設)
DiscardPolicy 不處理,丟棄掉
DiscardOldestPolicy 丟棄執行佇列中等待最久的一個任務,嘗試為新來的任務騰出位置
CallerRunsPolicy 直接由提交任務者執行這個任務

兩種方法設定拒絕策略:

//ThreadPoolExecutor物件的setRejectedExecutionHandler方法設定
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//構造方法進行設定,省略

執行緒池預設的拒絕行為是AbortPolicy,也就是丟擲RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。

如果不關心任務被拒絕的事件,可以將拒絕策略設定成DiscardPolicy,這樣多餘的任務會悄悄的被忽略。

ThreadFactory

一個介面類,用來對執行緒進行設定,需要實現newThread(Runnable r)方法

官方的文件說明:

newThread此方法一般來初始化執行緒的優先順序(priority),名字(name),守護程序(daemon)或執行緒組(ThreadGroup)

簡單的例子(讓某個類實現ThreadFactory介面):

@Override
public Thread newThread(Runnable r) {
    Thread thread = new Thread(r);
    thread.setDaemon(true);
    return thread;
}

執行緒池獲取執行結果

PS:把執行緒提交給執行緒池中,有兩種方法,一種是submit,另外一種則是execute

兩者的區別:

  1. execute沒有返回值,如果不需要知道執行緒的結果就使用execute方法,效能會好很多。
  2. submit返回一個Future物件,如果想知道執行緒結果就使用submit提交,而且它能在主執行緒中通過Future的get方法捕獲執行緒中的異常

執行緒池可以接收兩種的引數,一個為Runnable物件,另外則是Callable物件

Callable是JDK1.5時加入的介面,作為Runnable的一種補充,允許有返回值,允許丟擲異常。

執行緒池的處理結果、以及處理過程中的異常都被包裝到Future中,並在呼叫Future.get()方法時獲取,執行過程中的異常會被包裝成ExecutionException,submit()方法本身不會傳遞結果和任務執行過程中的異常。

獲取執行結果的程式碼可以這樣寫:

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            //該異常會在呼叫Future.get()時傳遞給呼叫者
            throw new RuntimeException("exception in call~");
        }
    });
    
try {
    //獲得返回結果
    Object result = future.get();
    
    
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
}

執行緒池執行流程

一個形象的比喻說明執行緒池的流程:

規定:

  1. 執行緒池比作成一家公司
  2. 公司的最大員工數為maximumPoolSize
  3. 最大正式員工數為coolPoolSize(核心執行緒的總數)
  4. 最大員工數(maximumPoolSize) = 最大正式員工(coolPoolSize)和臨時工(非核心執行緒)
  5. 單子(任務)可看做為執行緒
  6. 佇列使用的是ArrayBlockingQueue
  7. 一個員工只能幹一個任務

最開始的時候,公司是沒有一名員工。之後,公司接到了單子(任務),這個時候,公司才去找員工(建立核心執行緒並讓執行緒開始執行),這個時候找到的員工就是正式員工了。

公司的聲譽越來越好,於是來了更多的單子,公司繼續招人,直到正式員工數量達到最大的正式員工的數量(核心執行緒數量已達到最大)

於是,多出來的單子就暫時地存放在了佇列中,都在排隊,等待正式員工們把手頭的工作做完之後,就從佇列中依次取出單子繼續工作。

某天,來了一個新單子,但是這個時候佇列已經滿了,公司為了自己的信譽和聲譽著想,不得已只能去找臨時工(建立非核心執行緒)來幫忙開始進行工作(負責新單子)

在此之後,又來了新單子,公司繼續去招臨時工為新來的單子工作,直到正式工和臨時工的數量已經達到了公司最大員工數。

這個時候,公司沒有辦法了,只能拒絕新來的單子了(拒絕策略)

此時,正式工和臨時工都是在加班加點去從佇列中取出任務來工作,終於某一天,佇列的已經沒有單子了,市場發展不好,單子越來越少,臨時工很久都不工作了(非核心執行緒超過了最大存活時間keepAliveTime),公司就把這些臨時工解僱了,直到剩下只有正式員工。

PS:如果也想要解僱正式員工(銷燬核心執行緒),可以設定ThreadPoolExecutor物件的的allowCoreThreadTimeOut這個屬性為true

個人理解,可能不是很正確,僅供參考!

執行緒池關閉

方法 說明
shutdown() 不再接受新的任務,之前提交的任務等執行結束再關閉執行緒池
shutdownNow() 不再接受新的任務,試圖停止池中的任務再關閉執行緒池,返回所有未處理的執行緒list列表。

參考連結:
java中常用執行緒池的:newCachedThreadPool
Java執行緒池詳解
Java 執行緒池的認識和使用
Java 執行緒池全面解析
執行緒池,這一篇或許就夠了
Java執行緒池的執行原理以及使用詳