1. 程式人生 > >多線程學習總結之 線程池

多線程學習總結之 線程池

code service pla ever pro olt def 原理圖 cte

前言:

   本文基於jdk1.8。 前段時間換工作,面試時候每次都會問線程的問題,自己對多線程方面的知識沒有花時間研究過,所以一問到線程就懵了,最近特地買了方騰飛老師的《Java並發編程的藝術》這本書學學這方面的知識。這篇隨筆主要是我對線程池學習的總結,如有寫的不好或不對的地方歡迎指出!

1、線程池的基本概念

  線程池可以理解為一種管理線程的容器,是由我們根據自己的需求創建出來的。使用線程池可以降低系統資源開銷、提高響應速度並幫我們管理線程。

2、線程池的主要參數

  int corePoolSize:核心池大小,線程池正常保持存活的線程數,默認情況下,當我們創建一個線程池,它不會立刻創建線程,而是等到有任務提交時才會創建,當然我們也可以調用線程池的prestartAllCoreThreads()方法,讓線程池在創建時就創建corePoolSize數目的線程;

  int maximuxPoolSize:最大線程池大小,線程池所允許創建的最大線程數;

  long keepAliveTime:線程存活時間,當線程池中的線程數量大於核心池大小後,多出來的線程在空閑時間達到keepAliveTime後會被中斷,如果任務比較多,並且每個任務執行時間短,那可以調大這個參數,以提高線程的利用率;

  TimeUnit timeUnit:keepAliveTime的單位,值有:DAYS、HOURS、MINUTES、SECONDS、MILLISECONDS(毫秒)、MICROSECONDS(微秒)、NANOSECONDS(納秒);

  BlockingQueue workQueue:

任務隊列,主要實現類有:

    1)、LinkedBlockingQueue:基於鏈表的無界(最大值為Integer.MAX_VALUE)阻塞隊列,按FIFO(先進先出)的規則對任務進行排序,使用了此隊列的線程池中maximuxPoolSize和keepAliveTime這兩個參數就沒有意義了(原因下文解釋);

    2)、ArrayBlockingQueue:基於數組的有界阻塞隊列,按FIFO的規則對任務進行排序,可傳入參數來自定義隊列大小;

    3)、DelayedWorkQueue:基於堆的延遲隊列,靜態工廠Executors.newScheduledThreadPool(...)中使用了該隊列;

    4)、PriorityBlockingQueue:具有優先級的阻塞隊列;

    5)、SynchronousQueue:不存儲任務的阻塞隊列,每一個插入對應一個取出。

    吞吐量:SynchronousQueue > LinkedBlockingQueue > ArrayBlockingQueue

  ThreadFactory threadFactory:線程工廠,用來創建線程,可以通過線程工廠給新創建的線程設置更合理的名字、設置優先級等;

  RejectedExecutionHandler handler:拒絕任務的接口處理器;

    拒絕策略有:a、AbortPolicy:拒絕任務並拋出異常,默認的策略;

          b、DiscardPolicy:直接拒絕不拋出異常;

          c、DiscardOldestPolicy:丟棄隊列中最遠的一個任務(最先進入隊列的,FIFO),並執行當前任務;

          d、CallerRunsPolicy:只用調用者所在的線程來執行任務,不管其他線程的事。技術分享圖片

          e、當然也可以自定義拒絕策略,來處理如記錄日誌、持久化等已有拒絕策略不能實現的功能,需實現RejectedExecutionHandler接口,重寫rejectedExecution()方法。

3、線程池的工作原理

  線程池通過調用execute()方法工作,當然也可以調用submit()方法,主要區別是submit()方法可以返回任務執行的結果future對象,而execute()沒有返回值,execute()方法的源碼如下:

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn‘t, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 1、如果當前線程數小於核心池,則創建線程並執行當前任務
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 2、如果條件1不滿足則將任務放進任務隊列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) // 如果線程池不處於運行狀態,則拒絕
                reject(command);
            else if (workerCountOf(recheck) == 0) // 3、如果線程數沒超過最大池數則創建線程並執行任務
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 4、如果任務無法放入隊列,則拒絕
            reject(command); // 拒絕
    }

線程池工作原理圖:
技術分享圖片

以上源碼中1、2 、3 註釋就對應線程池工作原理圖中的1、2、3步的判斷。

  這裏註意,當線程池中的線程數量小於核心池量,並且這時線程池中還有空閑線程(之前執行任務的線程已經完成工作了),如果這時候有任務提交還是會創建新線程,因為execute()方法中只要當前線程池中線程數量小於核心池就調用addWorker()創建線程執行當前任務,這個似乎有一點不合理,不知 Doug Lea大神以後會不會改進。

  下面舉個小例子來和線程池工作原理比較一下:有一個小工廠,最多能容納20(maximumPoolSize)個工人(線程)幹活,目前老板只招了10(corePoolSize)個工人,老板規定不管有沒有活都要來上班,活不多時候可以一 部分人幹 一部分人歇著,反正都是老員工老板養的起(核心池中一部分線程空閑,但不會被中斷),工廠還有個小倉庫(任務隊列BlockingQueue),有時候活多了幹不完,原料(任務)就堆到倉庫裏,倉庫要是堆滿了,老板就想辦法了,由於老板比較摳門,就招了5個零時工(大於corePoolSize那部分),這批活做的差不多了,老板不想多養幾個閑人就辭掉3個零時工(空閑線程達到設定的存活時間,中斷),這時又來了一批活,量很大,於是老板又招了8個零時工,這時工廠的工位滿了(線程數達到 maximumPoolSize),現在再有活來老板就拒絕了(RejectedExecutionHandler)。

  在介紹線程池參數時有說過如果任務隊列是LinkedBlockingQueue,線程池大小和存活時間這兩個參數就失效了,這裏如果工廠的倉庫是無限容量的,老板就不用擔心活幹不完啦,幹不完的活直接扔進倉庫就好了,並且老板還可以根據客戶要求的期限對任務進行排序,這樣就不用再招零時工,自然也沒有辭退空閑零時工的事了。

4、常用線程池

  1)、FixedThreadPool   

    固定大小的線程池,它的核心池數和最大線程池數都是傳入的參數的值,存活時間為0,即無任務時立即中斷,任務隊列是 LinkedBlockingQueue。

    優點:可控制並發數量,多出新近的任務會在隊列中等待,任務可以無限多。

    缺點:線程池大小固定,隨著業務量的變化,改起來不方便,但可以寫在配置文件裏。

    適用場景:一定數量的任務,執行所需時間長,為了滿足管理資源的需求,而需要限制當前線程數量的應用場景,它適用於負載較輕的服務器。

源碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
使用示例:
public static void fixedThreadPoolTest(){
    ExecutorService fixdThreadPool = Executors.newFixedThreadPool(3);
    for (int i = 0;i < 10; i++){
        int index = i;
        fixdThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+":"+index));
    }
}
運行結果:
pool-1-thread-2:1
pool-1-thread-3:2
pool-1-thread-1:0
pool-1-thread-3:4
pool-1-thread-2:3
pool-1-thread-3:6
pool-1-thread-1:5
pool-1-thread-3:8
pool-1-thread-2:7
pool-1-thread-1:9

  2)、CachedTheadPool

    可緩存的線程池,核心池為0,最大線程池數為Integer.MAX_VALUE,空閑線程的存活時間60秒,任務隊列是SynchronousQueue。

    優點:可根據需要靈活創建線程數量,空閑60秒就中斷,節約系統資源。

    缺點:若使用場景不當,如任務很少,偶爾(60秒以上)來一個任務,那就每次都需要創建線程,這樣就很消耗系統資源。

    適用場景:適用於執行大量短期異步任務或者負載較輕的服務器。

源碼:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
使用示例:
public static void cachedThreadPoolTest(){
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0;i < 10; i++){
        int index = i;
        try {
            Thread.sleep(index*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cachedThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+":"+index));
    }
}

運行結果:
pool-1-thread-1:0
pool-1-thread-1:1
pool-1-thread-1:2
pool-1-thread-1:3
pool-1-thread-1:4
pool-1-thread-1:5
pool-1-thread-1:6
pool-1-thread-1:7
pool-1-thread-1:8
pool-1-thread-1:9

  3)、SingleThreadExecutor

    只有一個線程的線程池,核心池和最大線程池大小都是1,空閑線程存活時間是無意義的參數,任務隊列是LinkedBlockingQueue。

    優點:線程池中有且只有一個線程一直存在著,任務按順序執行,後來的任務在隊列裏排隊等待。

    缺點:不適合並發場景。

    適用場景:任務需要按順序並且無並發的執行。

源碼:
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
                      0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
使用示例:
public static void singleThreadExecutorTest(){
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    for (int i = 0;i < 10; i++){
        int index = i;
        singleThreadExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ":"+index));
    }
}
運行結果:
pool-1-thread-1:0
pool-1-thread-1:1
pool-1-thread-1:2
pool-1-thread-1:3
pool-1-thread-1:4
pool-1-thread-1:5
pool-1-thread-1:6
pool-1-thread-1:7
pool-1-thread-1:8
pool-1-thread-1:9

  4)、ScheduledThreadPool

    可執行定時或周期性任務的線程池,核心池為傳入的參數值,最大線程池為Integer.MAX_VALUE,空閑線程存活時間為0,任務隊列為DelayedWorkQueue。

    優點:可執行定時和周期性任務,書上說比Timer效果好,有時間測一下。

    缺點:暫時沒想到。

    適用場景:有定時、周期性批量任務需求時,如銀行批量代收付交易、處理對賬、批量放款等。

源碼:
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
使用示例:
public static void scheduledThreadPoolTest(){
   ScheduledExecutorService scheduledExecutorPool = Executors.newScheduledThreadPool(3);
     scheduledExecutorPool.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName()+
       ":delay 1 seconds,and execute every 3 seconds"),1,3,TimeUnit.SECONDS);
}
運行結果: pool
-1-thread-1:delay 1 seconds,and execute every 1 seconds pool-1-thread-1:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds

  5)、自定義線程池ThreadLocalExecutor

技術分享圖片

    如果上述四種由Executors工廠類提供的常用的線程池滿足不了你的業務需求,你可以自定義ThreadPoolExecutor,每個參數都可以按照你的需要設置。

源碼:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } 使用示例: public static void threadPoolExecutorTest(){ int corePoolSize = 3, maximumPoolSize = 5; long keepAliveTime = 1; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(1); RejectedExecutionHandler rejectedExecutionHandler = (Runnable r, ThreadPoolExecutor executor) -> System.out.println("其實我是拒絕的"); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime,unit,workQueue,rejectedExecutionHandler); for (int i = 0; i < 10; i++){ int index = i; threadPoolExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ":"+index)); } } 運行結果: pool-1-thread-2:1 pool-1-thread-4:4 pool-1-thread-3:2 其實我是拒絕的 pool-1-thread-1:0 pool-1-thread-1:7 pool-1-thread-5:5 pool-1-thread-2:3 其實我是拒絕的 pool-1-thread-4:8

5、合理線程池的參數

  1)、CPU密集型任務(如壓縮和解壓縮,這種需要CPU不停的計算的任務)應配置盡可能小的線程,如配置CPU個數+1 數量的線程池;

  2)、IO密集型任務,線程並不是一直在執行任務,則應配置盡可能多的線程,如2倍的CPU數;

  3)、混合性任務,如果可以拆分,將器拆分成一個CPU密集性任務和一個IO密集型任務,只要這兩個任務執行時間相差不是太大,那麽分解後執行的吞吐量將高於串行執行的吞吐量。如果兩個任務執行時間相差很大就沒必要進行拆分了;

  4)、優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理,它可以讓優先級高的任務先執行;

  5)、執行時間不同的任務可以交給不同規模的線程池來處理,或者可以適用優先級隊列,讓執行時間段的任務先執行;

  6)、是否依賴其他系統資源,如依賴數據庫連接池的任務,因為線程提交SQL後需要等待數據庫返回結果,等待的時間越長,則CPU空閑時間就越長,那麽線程數應該設置的越大,這樣才能更好的適用CPU;

  7)、盡量使用有界隊列,因為有界隊列可以增加系統的穩定性和預警能力(無界隊列可能會因為任務太多積壓在隊列裏而撐滿內存,導致系統癱瘓),可以根據需要將隊列設大一點,比如幾千。

6、線程池的關閉  

  1)、shutdown() 將線程池的狀態置為SHUTDOWN,線程池會將空閑的線程調用它的interrupt()進行中斷,還在排隊的任務取消,然後等待正在執行任務的線程執行完成後銷毀所有線程;

  2)、shutdownNow() 將線程池的狀態置為STOP, 然後遍歷線程池中所有的線程,並逐個調用它們的interrupt()方法進行中斷正在執行任務或者暫停的線程,並返回還在排隊的任務列表。

參考資料:1、《Java並發編程的藝術》

     2、https://www.cnblogs.com/dolphin0520/p/3932921.html

     3、https://juejin.im/post/5b3cf259e51d45194e0b7204

多線程學習總結之 線程池