1. 程式人生 > >Java 中的幾種線程池這麽用才是對的

Java 中的幾種線程池這麽用才是對的

start java開發手冊 nds 細節 div 進入 args rdo 等待

為什麽要使用線程池

雖然大家應該都已經很清楚了,但還是說一下。其實歸根結底最主要的一個原因就是為了提高性能。

線程池和數據庫連接池是同樣的道理,數據庫連接池是為了減少連接建立和釋放帶來的性能開銷。而線程池則是為了減少線程建立和銷毀帶來的性能消耗。

以 web 項目為例,有以下兩種情況:

1、每次過來一個請求,都要在服務端創建一個新線程來處理請求,請求處理完成銷毀線程;

2、每次過來一個請求,服務端在線程池中直接拿過一個空閑的線程來處理這個請求,處理完成後還給線程池;

答案是肯定的,肯定是第二種使用線程池的方式性能更好。

除了性能這個最重要的原因外,線程池的使用可以幫助我們更合理的使用系統資源。還是以 web 項目為例,如果我們在服務端不使用線程池,而是無節制的來一個請求創建一個線程,系統資源將會很快被耗盡。而使用線程池的話,則可以防止這種情況發生,當然這要建立在正確合理的使用線程池的基礎上,要固定線程的最大數以及等待隊列的大小。

幾種線程池的使用和原理

線程池固然好用,但是要建立在正確的使用方式的基礎上,如果使用方式不當,同樣會出現問題。接下來就介紹一下幾種線程池的使用。

在大名鼎鼎的 J.U.C 包下已經提供了 Executors 類,它已經封裝實現了四種創建線程池的方式,它暴露出幾個簡單的方法供開發者調用。最終都是通過 new ThreadPoolExecutor() ExecutorService 實例,從而得到我們想要的線程池類型。這樣做其實有利有弊,好的是我們不用關心那麽多參數,只需要簡單的指定一兩個參數就可以;不好的是,這樣一來又屏蔽了很多細節,如果有些參數使用默認的,而開發者又不了解原理的情況下,可能會造成 OOM 等問題。

很多公司都不建議或者強制不允許直接使用 Executors 類提供的方法來創建線程池,例如阿裏巴巴Java開發手冊裏就明確不允許這樣創建線程池,一定要通過 ThreadPoolExecutor(xx,xx,xx...) 來明確線程池的運行規則,指定更合理的參數。

先來看一下 ThreadPoolExecutor 的幾個參數和它們的意義,先來看一下它最完整參數的重載。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

一共有 7 個參數。

corePoolSize

核心線程數,當有任務進來的時候,如果當前線程數還未達到 corePoolSize 個數,則創建核心線程,核心線程有幾個特點:

1、當線程數未達到核心線程最大值的時候,新任務進來,即使有空閑線程,也不會復用,仍然新建核心線程;

2、核心線程一般不會被銷毀,即使是空閑的狀態,但是如果通過方法 allowCoreThreadTimeOut(boolean value) 設置為 true 時,超時也同樣會被銷毀;

3、生產環境首次初始化的時候,可以調用 prestartCoreThread() 方法來預先創建所有核心線程,避免第一次調用緩慢;

maximumPoolSize

除了有核心線程外,有些策略是當核心線程完全無空閑的時候,還會創建一些臨時的線程來處理任務,maximumPoolSize 就是核心線程 + 臨時線程的最大上限。臨時線程有一個超時機制,超過了設置的空閑時間沒有事兒幹,就會被銷毀。

keepAliveTime

這個就是上面兩個參數裏所提到的超時時間,也就是線程的最大空閑時間,默認用於非核心線程,通過 allowCoreThreadTimeOut(boolean value) 方法設置後,也會用於核心線程。

unit

這個參數配合上面的 keepAliveTime ,指定超時的時間單位,秒、分、時等。

workQueue

等待執行的任務隊列,如果核心線程沒有空閑的了,新來的任務就會被放到這個等待隊列中。這個參數其實一定程度上決定了線程池的運行策略,為什麽這麽說呢,因為隊列分為有界隊列和無界隊列。

有界隊列:隊列的長度有上限,當核心線程滿載的時候,新任務進來進入隊列,當達到上限,有沒有核心線程去即時取走處理,這個時候,就會創建臨時線程。(警惕臨時線程無限增加的風險)

無界隊列:隊列沒有上限的,當沒有核心線程空閑的時候,新來的任務可以無止境的向隊列中添加,而永遠也不會創建臨時線程。(警惕任務隊列無限堆積的風險)

threadFactory

它是一個接口,用於實現生成線程的方式、定義線程名格式、是否後臺執行等等,可以用 Executors.defaultThreadFactory() 默認的實現即可,也可以用 Guava 等三方庫提供的方法實現,如果有特殊要求的話可以自己定義。它最重要的地方應該就是定義線程名稱的格式,便於排查問題了吧。

handler

當沒有空閑的線程處理任務,並且等待隊列已滿(當然這只對有界隊列有效),再有新任務進來的話,就要做一些取舍了,而這個參數就是指定取舍策略的,有下面四種策略可以選擇:

ThreadPoolExecutor.AbortPolicy:直接拋出異常,這是默認策略; 
ThreadPoolExecutor.DiscardPolicy:直接丟棄任務,但是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後將新來的任務加入等待隊列
ThreadPoolExecutor.CallerRunsPolicy:由線程池所在的線程處理該任務,比如在 main 函數中創建線程池,如果執行此策略,將有 main 線程來執行該任務

雖然並不提倡用 Executors 中的方法來創建線程池,但還是用他們來講一下幾種線程池的原理。

1、newFixedThreadPool

它有兩個重載方法,代碼如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
 }

建立一個線程數量固定的線程池,規定的最大線程數量,超過這個數量之後進來的任務,會放到等待隊列中,如果有空閑線程,則在等待隊列中獲取,遵循先進先出原則。

創建固定線程數量線程池, corePoolSize 和 maximumPoolSize 要一致,即核心線程數和最大線程數(核心+非核心線程)一致,Executors 默認使用的是 LinkedBlockingQueue 作為等待隊列,這是一個無界隊列,這也是使用它的風險所在,除非你能保證提交的任務不會無節制的增長,否則不要使用無界隊列,這樣有可能造成等待隊列無限增加,造成 OOM。

正確的創建固定線程數線程池的做法是

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();

public static ExecutorService createFixedThreadPool() {
        int poolSize = 5;
        int queueSize = 10;
        ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize), threadFactory, new ThreadPoolExecutor.AbortPolicy());
        return executorService;
    }

上面代碼是創建一個 5 個線程的固定數量線程池,這裏線程存活時間沒有作用,所以設置為 0,使用了 ArrayBlockingQueue 作為等待隊列,設置長度為 10 ,最多允許10個等待任務,超過的任務會執行默認的 AbortPolicy 策略,也就是直接拋異常。ThreadFactory 使用了 Guava 庫提供的方法,定義了線程名稱,方便之後排查問題。

2、newSingleThreadExecutor

建立一個只有一個線程的線程池,如果有超過一個任務進來,只有一個可以執行,其余的都會放到等待隊列中,如果有空閑線程,則在等待隊列中獲取,遵循先進先出原則。使用 LinkedBlockingQueue 作為等待隊列。

這個方法同樣存在等待隊列無限長的問題,容易造成 OOM,所以正確的創建方式參考上面固定數量線程池創建的方式,只是把 poolSize 設置為 1 。

3、newCachedThreadPool

緩存型線程池,在核心線程達到最大值之前,有任務進來就會創建新的核心線程,並加入核心線程池,即時有空閑的線程,也不會復用。達到最大核心線程數後,新任務進來,如果有空閑線程,則直接拿來使用,如果沒有空閑線程,則新建臨時線程。並且線程的允許空閑時間都很短,如果超過空閑時間沒有活動,則銷毀臨時線程。關鍵點就在於它使用 SynchronousQueue 作為等待隊列,它不會保留任務,新任務進來後,直接創建臨時線程處理,這樣一來,也就容易造成無限制的創建線程,造成 OOM。

正確的創建緩存型線程池的做法是

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();

    public static ExecutorService createCacheThreadPool(){
        int coreSize = 10;
        int maxSize = 20;
        return new ThreadPoolExecutor(coreSize, maxSize, 10L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    }

4、newScheduledThreadPool

計劃型線程池,可以設置固定時間的延時或者定期執行任務,同樣是看線程池中有沒有空閑線程,如果有,直接拿來使用,如果沒有,則新建線程加入池。使用的是 DelayedWorkQueue 作為等待隊列,這中類型的隊列會保證只有到了指定的延時時間,才會執行任務。

正確的創建緩存型線程池的做法是

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fengzheng" + "-%d").setDaemon(true).build();

    private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException {
        Task task = new Task();
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, threadFactory);
        executorService.scheduleAtFixedRate(task,0L,5L, TimeUnit.SECONDS);
        latch.await();
    }

    static class Task implements Runnable{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "executing");
        }
    }

Java 中的幾種線程池這麽用才是對的