多線程學習總結之 線程池
前言:
本文基於jdk1.8。 前段時間換工作,面試時候每次都會問線程的問題,自己對多線程方面的知識沒有花時間研究過,所以一問到線程就懵了,最近特地買了方騰飛老師的《Java並發編程的藝術》這本書學學這方面的知識。這篇隨筆主要是我對線程池學習的總結,如有寫的不好或不對的地方歡迎指出!
1、線程池的基本概念
線程池可以理解為一種管理線程的容器,是由我們根據自己的需求創建出來的。使用線程池可以降低系統資源開銷、提高響應速度並幫我們管理線程。
2、線程池的主要參數
int corePoolSize:核心池大小,線程池正常保持存活的線程數,默認情況下,當我們創建一個線程池,它不會立刻創建線程,而是等到有任務提交時才會創建,當然我們也可以調用線程池的prestartAllCoreThreads()方法,讓線程池在創建時就創建corePoolSize數目的線程;
int maximuxPoolSize:最大線程池大小,線程池所允許創建的最大線程數;
long keepAliveTime:線程存活時間,當線程池中的線程數量大於核心池大小後,多出來的線程在空閑時間達到keepAliveTime後會被中斷,如果任務比較多,並且每個任務執行時間短,那可以調大這個參數,以提高線程的利用率;
TimeUnit timeUnit:keepAliveTime的單位,值有:DAYS、HOURS、MINUTES、SECONDS、MILLISECONDS(毫秒)、MICROSECONDS(微秒)、NANOSECONDS(納秒);
BlockingQueue workQueue:
1)、LinkedBlockingQueue:基於鏈表的無界(最大值為Integer.MAX_VALUE)阻塞隊列,按FIFO(先進先出)的規則對任務進行排序,使用了此隊列的線程池中maximuxPoolSize和keepAliveTime這兩個參數就沒有意義了(原因下文解釋);
2)、ArrayBlockingQueue:基於數組的有界阻塞隊列,按FIFO的規則對任務進行排序,可傳入參數來自定義隊列大小;
3)、DelayedWorkQueue:基於堆的延遲隊列,靜態工廠Executors.newScheduledThreadPool(...)中使用了該隊列;
4)、PriorityBlockingQueue:具有優先級的阻塞隊列;
5)、SynchronousQueue:不存儲任務的阻塞隊列,每一個插入對應一個取出。
吞吐量:SynchronousQueue > LinkedBlockingQueue > ArrayBlockingQueue
ThreadFactory threadFactory:線程工廠,用來創建線程,可以通過線程工廠給新創建的線程設置更合理的名字、設置優先級等;
RejectedExecutionHandler handler:拒絕任務的接口處理器;
拒絕策略有:a、AbortPolicy:拒絕任務並拋出異常,默認的策略;
b、DiscardPolicy:直接拒絕不拋出異常;
c、DiscardOldestPolicy:丟棄隊列中最遠的一個任務(最先進入隊列的,FIFO),並執行當前任務;
d、CallerRunsPolicy:只用調用者所在的線程來執行任務,不管其他線程的事。
e、當然也可以自定義拒絕策略,來處理如記錄日誌、持久化等已有拒絕策略不能實現的功能,需實現RejectedExecutionHandler接口,重寫rejectedExecution()方法。
3、線程池的工作原理
線程池通過調用execute()方法工作,當然也可以調用submit()方法,主要區別是submit()方法可以返回任務執行的結果future對象,而execute()沒有返回值,execute()方法的源碼如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn‘t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 1、如果當前線程數小於核心池,則創建線程並執行當前任務 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 2、如果條件1不滿足則將任務放進任務隊列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 如果線程池不處於運行狀態,則拒絕 reject(command); else if (workerCountOf(recheck) == 0) // 3、如果線程數沒超過最大池數則創建線程並執行任務 addWorker(null, false); } else if (!addWorker(command, false)) // 4、如果任務無法放入隊列,則拒絕 reject(command); // 拒絕 }
線程池工作原理圖:
以上源碼中1、2 、3 註釋就對應線程池工作原理圖中的1、2、3步的判斷。
這裏註意,當線程池中的線程數量小於核心池量,並且這時線程池中還有空閑線程(之前執行任務的線程已經完成工作了),如果這時候有任務提交還是會創建新線程,因為execute()方法中只要當前線程池中線程數量小於核心池就調用addWorker()創建線程執行當前任務,這個似乎有一點不合理,不知 Doug Lea大神以後會不會改進。
下面舉個小例子來和線程池工作原理比較一下:有一個小工廠,最多能容納20(maximumPoolSize)個工人(線程)幹活,目前老板只招了10(corePoolSize)個工人,老板規定不管有沒有活都要來上班,活不多時候可以一 部分人幹 一部分人歇著,反正都是老員工老板養的起(核心池中一部分線程空閑,但不會被中斷),工廠還有個小倉庫(任務隊列BlockingQueue),有時候活多了幹不完,原料(任務)就堆到倉庫裏,倉庫要是堆滿了,老板就想辦法了,由於老板比較摳門,就招了5個零時工(大於corePoolSize那部分),這批活做的差不多了,老板不想多養幾個閑人就辭掉3個零時工(空閑線程達到設定的存活時間,中斷),這時又來了一批活,量很大,於是老板又招了8個零時工,這時工廠的工位滿了(線程數達到 maximumPoolSize),現在再有活來老板就拒絕了(RejectedExecutionHandler)。
在介紹線程池參數時有說過如果任務隊列是LinkedBlockingQueue,線程池大小和存活時間這兩個參數就失效了,這裏如果工廠的倉庫是無限容量的,老板就不用擔心活幹不完啦,幹不完的活直接扔進倉庫就好了,並且老板還可以根據客戶要求的期限對任務進行排序,這樣就不用再招零時工,自然也沒有辭退空閑零時工的事了。
4、常用線程池
1)、FixedThreadPool
固定大小的線程池,它的核心池數和最大線程池數都是傳入的參數的值,存活時間為0,即無任務時立即中斷,任務隊列是 LinkedBlockingQueue。
優點:可控制並發數量,多出新近的任務會在隊列中等待,任務可以無限多。
缺點:線程池大小固定,隨著業務量的變化,改起來不方便,但可以寫在配置文件裏。
適用場景:一定數量的任務,執行所需時間長,為了滿足管理資源的需求,而需要限制當前線程數量的應用場景,它適用於負載較輕的服務器。
源碼: public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } 使用示例: public static void fixedThreadPoolTest(){ ExecutorService fixdThreadPool = Executors.newFixedThreadPool(3); for (int i = 0;i < 10; i++){ int index = i; fixdThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+":"+index)); } } 運行結果: pool-1-thread-2:1 pool-1-thread-3:2 pool-1-thread-1:0 pool-1-thread-3:4 pool-1-thread-2:3 pool-1-thread-3:6 pool-1-thread-1:5 pool-1-thread-3:8 pool-1-thread-2:7 pool-1-thread-1:9
2)、CachedTheadPool
可緩存的線程池,核心池為0,最大線程池數為Integer.MAX_VALUE,空閑線程的存活時間60秒,任務隊列是SynchronousQueue。
優點:可根據需要靈活創建線程數量,空閑60秒就中斷,節約系統資源。
缺點:若使用場景不當,如任務很少,偶爾(60秒以上)來一個任務,那就每次都需要創建線程,這樣就很消耗系統資源。
適用場景:適用於執行大量短期異步任務或者負載較輕的服務器。
源碼: public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); } 使用示例: public static void cachedThreadPoolTest(){ ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0;i < 10; i++){ int index = i; try { Thread.sleep(index*1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+":"+index)); } } 運行結果: pool-1-thread-1:0 pool-1-thread-1:1 pool-1-thread-1:2 pool-1-thread-1:3 pool-1-thread-1:4 pool-1-thread-1:5 pool-1-thread-1:6 pool-1-thread-1:7 pool-1-thread-1:8 pool-1-thread-1:9
3)、SingleThreadExecutor
只有一個線程的線程池,核心池和最大線程池大小都是1,空閑線程存活時間是無意義的參數,任務隊列是LinkedBlockingQueue。
優點:線程池中有且只有一個線程一直存在著,任務按順序執行,後來的任務在隊列裏排隊等待。
缺點:不適合並發場景。
適用場景:任務需要按順序並且無並發的執行。
源碼: public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } 使用示例: public static void singleThreadExecutorTest(){ ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0;i < 10; i++){ int index = i; singleThreadExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ":"+index)); } } 運行結果: pool-1-thread-1:0 pool-1-thread-1:1 pool-1-thread-1:2 pool-1-thread-1:3 pool-1-thread-1:4 pool-1-thread-1:5 pool-1-thread-1:6 pool-1-thread-1:7 pool-1-thread-1:8 pool-1-thread-1:9
4)、ScheduledThreadPool
可執行定時或周期性任務的線程池,核心池為傳入的參數值,最大線程池為Integer.MAX_VALUE,空閑線程存活時間為0,任務隊列為DelayedWorkQueue。
優點:可執行定時和周期性任務,書上說比Timer效果好,有時間測一下。
缺點:暫時沒想到。
適用場景:有定時、周期性批量任務需求時,如銀行批量代收付交易、處理對賬、批量放款等。
源碼: public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue()); } 使用示例: public static void scheduledThreadPoolTest(){ ScheduledExecutorService scheduledExecutorPool = Executors.newScheduledThreadPool(3); scheduledExecutorPool.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName()+ ":delay 1 seconds,and execute every 3 seconds"),1,3,TimeUnit.SECONDS); }
運行結果: pool-1-thread-1:delay 1 seconds,and execute every 1 seconds pool-1-thread-1:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds
5)、自定義線程池ThreadLocalExecutor
如果上述四種由Executors工廠類提供的常用的線程池滿足不了你的業務需求,你可以自定義ThreadPoolExecutor,每個參數都可以按照你的需要設置。
源碼:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } 使用示例: public static void threadPoolExecutorTest(){ int corePoolSize = 3, maximumPoolSize = 5; long keepAliveTime = 1; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(1); RejectedExecutionHandler rejectedExecutionHandler = (Runnable r, ThreadPoolExecutor executor) -> System.out.println("其實我是拒絕的"); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime,unit,workQueue,rejectedExecutionHandler); for (int i = 0; i < 10; i++){ int index = i; threadPoolExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ":"+index)); } } 運行結果: pool-1-thread-2:1 pool-1-thread-4:4 pool-1-thread-3:2 其實我是拒絕的 pool-1-thread-1:0 pool-1-thread-1:7 pool-1-thread-5:5 pool-1-thread-2:3 其實我是拒絕的 pool-1-thread-4:8
5、合理線程池的參數
1)、CPU密集型任務(如壓縮和解壓縮,這種需要CPU不停的計算的任務)應配置盡可能小的線程,如配置CPU個數+1 數量的線程池;
2)、IO密集型任務,線程並不是一直在執行任務,則應配置盡可能多的線程,如2倍的CPU數;
3)、混合性任務,如果可以拆分,將器拆分成一個CPU密集性任務和一個IO密集型任務,只要這兩個任務執行時間相差不是太大,那麽分解後執行的吞吐量將高於串行執行的吞吐量。如果兩個任務執行時間相差很大就沒必要進行拆分了;
4)、優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理,它可以讓優先級高的任務先執行;
5)、執行時間不同的任務可以交給不同規模的線程池來處理,或者可以適用優先級隊列,讓執行時間段的任務先執行;
6)、是否依賴其他系統資源,如依賴數據庫連接池的任務,因為線程提交SQL後需要等待數據庫返回結果,等待的時間越長,則CPU空閑時間就越長,那麽線程數應該設置的越大,這樣才能更好的適用CPU;
7)、盡量使用有界隊列,因為有界隊列可以增加系統的穩定性和預警能力(無界隊列可能會因為任務太多積壓在隊列裏而撐滿內存,導致系統癱瘓),可以根據需要將隊列設大一點,比如幾千。
6、線程池的關閉
1)、shutdown() 將線程池的狀態置為SHUTDOWN,線程池會將空閑的線程調用它的interrupt()進行中斷,還在排隊的任務取消,然後等待正在執行任務的線程執行完成後銷毀所有線程;
2)、shutdownNow() 將線程池的狀態置為STOP, 然後遍歷線程池中所有的線程,並逐個調用它們的interrupt()方法進行中斷正在執行任務或者暫停的線程,並返回還在排隊的任務列表。
參考資料:1、《Java並發編程的藝術》
2、https://www.cnblogs.com/dolphin0520/p/3932921.html
3、https://juejin.im/post/5b3cf259e51d45194e0b7204
多線程學習總結之 線程池