1. 程式人生 > >Java併發程式設計之執行緒池的使用

Java併發程式設計之執行緒池的使用

1. 為什麼要使用多執行緒?

隨著科技的進步,現在的電腦及伺服器的處理器數量都比較多,以後可能會越來越多,比如我的工作電腦的處理器有8個,怎麼檢視呢?

計算機右鍵--屬性--裝置管理器,開啟屬性視窗,然後點選“裝置管理器”,在“處理器”下可看到所有的處理器:

也可以通過以下Java程式碼獲取到處理器的個數:

System.out.println("CPU個數:" + Runtime.getRuntime().availableProcessors());

執行結果如下所示:

CPU個數:8

既然處理器的個數增加了,如果還使用傳統的序列程式設計,就有點浪費資源了,因此,為了提高資源利用率,讓各個處理器都忙碌起來,就需要引入併發程式設計,要引入併發程式設計,就引入了多執行緒。

可以說,使用多執行緒的最直接目的就是為了提高資源利用率,資源的利用率提高了,系統的吞吐率也就相應提高了。

2. 為什麼要使用執行緒池?

在一定的範圍內,增加執行緒可以提高應用程式的吞吐率,但執行緒並不是越多越好(因為執行緒的建立與銷燬都需要很大的開銷),如果超過了某個範圍,不僅會降低應用程式的執行速度,嚴重的話,應用程式甚至會崩潰,以至於不得不重啟應用程式。

為了避免這種問題,就需要對應用程式可以建立的執行緒數量進行限制,確保線上程數量達到限制時,程式也不會耗盡資源,執行緒池就是為了解決這種問題而出現的。

執行緒池:管理一組工作執行緒的資源池。

執行緒池與工作佇列密切相關,工作佇列中儲存了所有等待執行的任務。

工作者執行緒的任務就是從工作佇列中獲取一個任務,執行任務,然後返回執行緒池並等待下一個任務。

使用執行緒池可以帶來以下好處:

  1. 通過重用現有的執行緒而不是建立新執行緒,可以在處理多個任務時減少線上程建立與銷燬過程中產生的巨大開銷。
  2. 當任務到達時,工作執行緒通常已經存在,因此不會由於等待建立執行緒而延遲任務的執行,從而提高了響應性。
  3. 可以通過調整執行緒池的大小,建立足夠多的執行緒使處理器保持忙碌狀態,同時還可以防止過多執行緒相互競爭資源而使應用程式耗盡記憶體或崩潰。

3. 建立執行緒池

3.1 使用Executors靜態方法建立(不推薦)

Executors類提供了以下4個靜態方法來快速的建立執行緒池:

  1. newFixedThreadPool
  2. newCachedThreadPool
  3. newSingleThreadExecutor
  4. newScheduledThreadPool

首先看下newFixedThreadPool()方法的使用方式:

ExecutorService threadPool = Executors.newFixedThreadPool(10);

它的原始碼為:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

說明:newFixedThreadPool將建立一個固定長度的執行緒池,每當提交一個任務時就建立一個執行緒,直到達到執行緒池的最大數量,這時執行緒池的規模將不再變化(如果某個執行緒由於發生了未預期的Exception而結束,那麼執行緒池會補充一個新的執行緒)。

然後看下newCachedThreadPool()方法的使用方式:

ExecutorService threadPool = Executors.newCachedThreadPool();

它的原始碼為:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

說明:newCachedThreadPool將建立一個可快取的執行緒池,如果執行緒池的規模超過了處理需求時,那麼將回收空閒的執行緒,而當需求增加時,則新增新的執行緒,執行緒池的最大規模為Integer.MAX_VALUE。

然後看下newSingleThreadExecutor()方法的使用方式:

ExecutorService threadPool = Executors.newSingleThreadExecutor();

它的原始碼為:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

說明:newSingleThreadExecutor是一個單執行緒的Executor,它建立單個工作者執行緒來執行任務,如果這個執行緒異常結束,就建立一個新的執行緒來替代。

newSingleThreadExecutor可以確保依照任務在佇列中的順序來序列執行。

最後看下newScheduledThreadPool()方法的使用方式:

ExecutorService threadPool = Executors.newScheduledThreadPool(10);

它的原始碼為:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

super指向如下程式碼:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

說明:newScheduledThreadPool將建立一個固定長度的執行緒池,而且以延遲或者定時的方式來執行任務,類似於Timer。

可以發現,以上4種方式最終都指向了ThreadPoolExecutor的以下建構函式,只是很多引數沒讓你指定,傳遞了預設值而已:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 省略具體的程式碼
}

雖然使用這4個方法可以快速的建立執行緒池,但還是不推薦使用,第一,很多引數都設定了預設值,不便於你理解各個引數的具體含義,第二,引數的預設值可能會造成一定的問題,最好是由使用者根據自己的需求自行指定。

那麼這7個引數分別代表什麼含義呢?請接著往下看。

3.2 使用ThreadPoolExecutor建構函式建立(推薦)

ThreadPoolExecutor共有以下4個建構函式,推薦使用這種方式來建立執行緒池:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

以上3個也都指向引數最全的第4個建構函式:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 省略具體的程式碼
}

以下為各個引數的講解:

  • corePoolSize:核心執行緒數。

  • maximumPoolSize:最大執行緒數。

    最大執行緒數=核心執行緒數+非核心執行緒數。

  • keepAliveTime:非核心執行緒閒置超時時間。

    一個非核心執行緒,如果不幹活(閒置狀態)的時長超過這個引數所設定的時長,就會被銷燬掉,如果設定了allowCoreThreadTimeOut = true,則會作用於核心執行緒。

  • unit:引數keepAliveTime的時間單位,如秒、分、小時。

  • workQueue:工作佇列,即要執行的任務佇列,裡面儲存等待執行的任務。

    這裡的阻塞佇列可選擇的有:LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue、DelayedWorkQueue。

    newFixedThreadPool()方法預設使用的LinkedBlockingQueue,

    newCachedThreadPool()方法預設使用的SynchronousQueue,

    newSingleThreadExecutor()方法預設使用的LinkedBlockingQueue,

    newScheduledThreadPool()方法預設使用的DelayedWorkQueue。

  • threadFactory:執行緒工廠,用來用來建立執行緒。

  • handler:飽和策略/拒絕處理任務時的策略。

    當workQueue已滿,並且執行緒池的執行緒數已達到maximumPoolSize,此時新提交的任務會交由RejectedExecutionHandler handler處理,主要有以下4種策略:

    AbortPolicy:中止策略,拋棄任務並丟擲未檢查的RejectedExecutionException,這也是預設的飽和策略。

    DiscardPolicy:拋棄策略,直接拋棄任務,但不丟擲異常。

    DiscardOldestPolicy:拋棄最舊的策略,拋棄下一個將被執行的任務,然後嘗試重新提交新的任務。

    CallerRunsPolicy:呼叫者執行策略,將任務回退到呼叫者,在呼叫者所在的執行緒執行該任務。

4. 執行緒池的執行原理

可以通過下面2張圖來理解執行緒池的執行原理:

1)如果執行緒池中的執行緒小於corePoolSize,則建立新執行緒來處理任務,這時建立的執行緒為核心執行緒。

2)如果執行緒中的執行緒等於或者大於corePoolSize,則將任務放到工作佇列中,即上圖中的BlockingQueue。

3)如果工作佇列已滿,無法將任務加入到BlockingQueue,則建立新的執行緒來處理任務,這時建立的執行緒為非核心執行緒,非核心執行緒在空閒一段時間後會被回收銷燬掉(keepAliveTime和unit就是用來定義這個空閒的時間是多少)。

4)如果建立新執行緒導致執行緒池中的執行緒數超過了maximumPoolSize,任務將被拒絕,並呼叫RejectedExecutionHandler.rejectedExecution()方法。

5. ThreadPoolExecutor示例

新建如下示例程式碼,建立1個corePoolSize為2,maximumPoolSize為3的執行緒池:

import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                System.out.println("任務1執行執行緒:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> {
            System.out.println("任務2執行執行緒:" + Thread.currentThread().getName());
        });
    }
}

執行結果為:

任務2執行執行緒:pool-1-thread-2

任務1執行執行緒:pool-1-thread-1

可以看出,因為執行緒池中的執行緒數小於corePoolSize,執行緒池建立了2個核心執行緒來分別執行任務1和任務2。

修改程式碼為如下所示,開啟3個任務:

import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                System.out.println("任務1執行執行緒:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(5 * 1000);
                System.out.println("任務2執行執行緒:" + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        threadPoolExecutor.execute(() -> System.out.println("任務3執行執行緒:" + Thread.currentThread().getName()));
    }
}

執行結果為:

任務1執行執行緒:pool-1-thread-1

任務3執行執行緒:pool-1-thread-1

任務2執行執行緒:pool-1-thread-2

可以看出,執行任務3時並沒有新建執行緒,而是先放入了工作佇列,最後由執行緒1執行完成。

在上面的程式碼中新增個任務4:

threadPoolExecutor.execute(() -> System.out.println("任務4執行執行緒:" + Thread.currentThread().getName()));

此時執行結果為:

任務4執行執行緒:pool-1-thread-3

任務3執行執行緒:pool-1-thread-3

任務1執行執行緒:pool-1-thread-1

任務2執行執行緒:pool-1-thread-2

可以看出,任務3是先放入了工作佇列,任務4放不到工作佇列(空間已滿),所以建立了第3個執行緒來執行,執行完畢後從佇列裡獲取到任務3執行,任務1和任務2分別由執行緒1和執行緒2執行。

修改下任務4的程式碼,並新增任務5:

threadPoolExecutor.execute(() -> {
    try {
        Thread.sleep(2 * 1000);
        System.out.println("任務4執行執行緒:" + Thread.currentThread().getName());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

threadPoolExecutor.execute(() -> System.out.println("任務5執行執行緒:" + Thread.currentThread().getName()));

此時執行結果為:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPoolExecutorTest$$Lambda$5/935044096@179d3b25 rejected from java.util.concurrent.ThreadPoolExecutor@254989ff[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:37)
任務4執行執行緒:pool-1-thread-3

任務3執行執行緒:pool-1-thread-3

任務1執行執行緒:pool-1-thread-1

任務2執行執行緒:pool-1-thread-2

可以看出,當提交任務5時,由於工作佇列已滿, 且執行緒池中的執行緒數已經為3,所以該任務被拋棄並丟擲了java.util.concurrent.RejectedExecutionException異常。

如果你看到了這裡,是否會好奇引數maximumPoolSize設定為多少合適呢?

這個問題,我們下次講解,歡迎持續關注,哈哈!

6. 原始碼及參考

Brian Goetz《Java併發程式設計實戰》

怎麼檢視處理器(cpu)的核數

ThreadPoolExecutor使用方法

Java執行緒池-ThreadPoolExecutor原理分析與實戰

深入理解 Java 多執行緒核心知識:跳槽面試必備

網際網路大廠Java面試題:使用無界佇列的執行緒池會導致記憶體飆升嗎?【石杉的架構筆記】

最後,歡迎關注我的微信公眾號:「申城異鄉人」,所有部落格會同步更新