詳解 Java 執行緒池
網上講解執行緒池的文章一搜一大把,講解的還不錯的文章仔細找找也是有的。
為了讓您的這兩塊錢花的物有所值,本文一定讓您對執行緒池有一個全新的瞭解,並讓您在今後的工作中,能夠熟練使用執行緒池。
首先說說為什麼用執行緒池?
執行緒的建立 / 銷燬伴隨著系統開銷,過於頻繁的建立 / 銷燬執行緒,會很大程度上影響處理效率。
例如:記建立執行緒消耗時間 T1,執行任務消耗時間 T2,銷燬執行緒消耗時間 T3
如果 T1 + T3 > T2,那麼是不是說開啟一個執行緒來執行這個任務太不划算了! 正好,執行緒池快取執行緒,可用已有的閒置執行緒來執行新任務,避免了 T1 + T3 帶來的系統開銷。
執行緒池種類
再說執行緒池種類之前,先說一下初始化執行緒池的幾個引數,如果這個引數弄明白了,對於執行緒池你就基本上就可以瞭解了。
先來看一下執行緒池的幾個建構函式。
//五個引數的建構函式 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
//六個引數的建構函式 -1 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) //六個引數的建構函式 -2 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
//七個引數的建構函式
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
int corePoolSize => 該執行緒池中核心執行緒數最大值
1. 核心執行緒
執行緒池新建執行緒的時候,如果當前執行緒總數小於 corePoolSize,則新建的是核心執行緒,如果超過 corePoolSize,則新建的是非核心執行緒
核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒啥也不幹 (閒置狀態)。
如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 這個屬性為 true,那麼核心執行緒如果不幹活(閒置狀態)的話,超過一定時間(時長下面引數決定),就會被銷燬掉
很好理解吧,正常情況下你不幹活我也養你,因為我總有用到你的時候,但有時候特殊情況 (比如我自己都養不起了),那你不幹活我就要把你幹掉了
2. 執行緒總數
int maximumPoolSize=> 該執行緒池中執行緒總數最大值執行緒總數 = 核心執行緒數 + 非核心執行緒數。
核心執行緒在上面解釋過了,這裡說下非核心執行緒:不是核心執行緒的執行緒(別激動,把刀放下...),其實在上面解釋過了。
3. 超時時間
long keepAliveTime => 該執行緒池中非核心執行緒閒置超時時長 一個非核心執行緒,如果不幹活(閒置狀態)的時長超過這個引數所設定的時長,就會被銷燬掉。 如果設定 allowCoreThreadTimeOut = true,則會作用於核心執行緒
4. 時間單位
TimeUnit unit keepAliveTime 的單位,TimeUnit 是一個 列舉型別,其包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小時
DAYS : 天
5. 佇列
BlockingQueue<Runnable> workQueue 該執行緒池中的任務佇列:維護著等待執行的 Runnable 物件
當所有的核心執行緒都在幹活時,新新增的任務會被新增到這個佇列中等待處理,如果佇列滿了,則新建非核心執行緒執行任務
6. 常用的 workQueue 型別
-
SynchronousQueue:這個佇列接收到任務的時候,會直接提交給執行緒處理,而不保留它,如果所有執行緒都在工作怎麼辦?那就新建一個執行緒來處理這個任務!所以為了保證不出現<執行緒數達到了maximumPoolSize而不能新建執行緒>的錯誤,使用這個型別佇列的時候,maximumPoolSize 一般指定成 Integer.MAX_VALUE,即無限大
-
LinkedBlockingQueue:這個佇列接收到任務的時候,如果當前執行緒數小於核心執行緒數,則新建執行緒(核心執行緒)處理任務;如果當前執行緒數等於核心執行緒數,則進入佇列等待。由於這個佇列沒有最大值限制,即所有超過核心執行緒數的任務都將被新增到佇列中,這也就導致了 maximumPoolSize 的設定失效,因為匯流排程數永遠不會超過 corePoolSize
-
ArrayBlockingQueue:可以限定佇列的長度,接收到任務的時候,如果沒有達到 corePoolSize 的值,則新建執行緒(核心執行緒)執行任務,如果達到了,則入隊等候,如果佇列已滿,則新建執行緒 (非核心執行緒) 執行任務,又如果匯流排程數到了 maximumPoolSize,並且佇列也滿了,則發生錯誤
-
DelayQueue:佇列內元素必須實現 Delayed 介面,這就意味著你傳進去的任務必須先實現 Delayed 介面。這個佇列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務
7. ThreadFactory
建立執行緒的方式,這是一個介面,你 new 他的時候需要實現他的Thread newThread(Runnable r)方法,一般用不上。
小夥伴應該知道 AsyncTask 是對執行緒池的封裝吧?那就直接放一個 AsyncTask 新建執行緒池的 threadFactory 引數原始碼吧:
new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread new Thread(Runnable r) {
return new Thread(r,"AsyncTask#"+mCount.getAndIncrement());
}
}
這麼簡單?就給執行緒起了個名!
8. RejectedExecutionHandler
這玩意兒就是丟擲異常專用的,比如上面提到的兩個錯誤發生了,就會由這個handler丟擲異常,你不指定他也有個預設的
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常 ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)。 ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
9. ThreadPoolExecutor 的策略
上面介紹引數的時候其實已經說到了 ThreadPoolExecutor 執行的策略,這裡給總結一下,當一個任務被新增進執行緒池時:
1.執行緒數量未達到corePoolSize,則新建一個執行緒(核心執行緒)執行任務 2.執行緒數量達到了 corePools,則將任務移入佇列等待 3.佇列已滿,新建執行緒(非核心執行緒)執行任務 4.佇列已滿,匯流排程數又達到了 maximumPoolSize,就會由上面那位星期天 (RejectedExecutionHandler) 丟擲異常
常見四種執行緒池
如果你不想自己寫一個執行緒池,那麼你可以從下面看看有沒有符合你要求的 (一般都夠用了),如果有,那麼很好你直接用就行了,如果沒有,那你就老老實實自己去寫一個吧。
Java 通過 Executors 提供了四種執行緒池,這四種執行緒池都是直接或間接配置 ThreadPoolExecutor 的引數實現的,下面我都會貼出這四種執行緒池建構函式的原始碼,各位大佬們一看便知!
CachedThreadPool
優勢:
1.執行緒數無限制
2.有空閒執行緒則複用空閒執行緒,若無空閒執行緒則新建執行緒
3.一定程式減少頻繁建立/銷燬執行緒,減少系統開銷
建立方法:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
FixedThreadPool
優勢:
1.可控制執行緒最大併發數(同時執行的執行緒數)
2.超出的執行緒會在佇列中等待
建立方法:
//nThreads => 最大執行緒數即maximumPoolSize
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);
//threadFactory => 建立執行緒的方法,這就是我叫你別理他的那個星期六!你還看!
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
原始碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2個引數的構造方法原始碼,不用我貼你也知道他把星期六放在了哪個位置!所以我就不貼了,省下篇幅給我扯皮
ScheduledThreadPool
支援定時及週期性任務執行。
//建立方法:
//nThreads => 最大執行緒數即maximumPoolSize
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);
//原始碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
SingleThreadExecutor
優勢:
1.有且僅有一個工作執行緒執行任務
2.所有任務按照指定順序執行,即遵循佇列的入隊出隊規則
//建立方法:
ExecutorService singleThreadPool = Executors.newSingleThreadPool();
//原始碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 還有一個Executors.newSingleThreadScheduledExecutor()結合了 3 和 4,就不介紹了,基本不用
準確的指定引數大小
系統負載
引數的設定跟系統的負載有直接的關係,下面為系統負載的相關引數:
- tasks:每秒需要處理的最大任務數量
- tasktime:處理第個任務所需要的時間
- responsetime:系統允許任務最大的響應時間,比如每個任務的響應時間不得超過2秒。
引數設定
corePoolSize:
每個任務需要 tasktime 秒處理,則每個執行緒每鈔可處理 1 / tasktime 個任務。
系統每秒有 tasks 個任務需要處理,則需要的執行緒數為:tasks / ( 1 / tasktime ),即 tasks * tasktime 個執行緒數。
假設系統每秒任務數為 100 ~ 1000,每個任務耗時 0.1 秒,則需要 100 * 0.1 至 1000 * 0.1,即 10 ~ 100 個執行緒。
那麼 corePoolSize 應該設定為大於 10,具體數字最好根據 8020 原則,即 80 % 情況下系統每秒任務數,若系統80%的情況下第秒任務數小於 200,最多時為 1000,則 corePoolSize 可設定為 20。
queueCapacity:
任務佇列的長度要根據核心執行緒數,以及系統對任務響應時間的要求有關。 佇列長度可以設定為 ( corePoolSize / tasktime ) * responsetime : ( 20 / 0.1 ) * 2 = 400,即佇列長度可設定為 400。
佇列長度設定過大,會導致任務響應時間過長,切忌以下寫法:
LinkedBlockingQueue queue = new LinkedBlockingQueue();
這實際上是將佇列長度設定為Integer.MAX_VALUE,將會導致執行緒數量永遠為corePoolSize,再也不會增加,當任務數量陡增時,任務響應時間也將隨之陡增。
maxPoolSize:
當系統負載達到最大值時,核心執行緒數已無法按時處理完所有任務,這時就需要增加執行緒。
每秒 200 個任務需要 20 個執行緒,那麼當每秒達到 1000 個任務時,則需要 ( 1000 - queueCapacity ) * ( 20 / 200 ),即 60 個執行緒,可將 maxPoolSize 設定為 60。
keepAliveTime:
執行緒數量只增加不減少也不行。當負載降低時,可減少執行緒數量,如果一個執行緒空閒時間達到 keepAliveTiime,該執行緒就退出。 預設情況下執行緒池最少會保持 corePoolSize 個執行緒。
allowCoreThreadTimeout:
預設情況下核心執行緒不會退出,可通過將該引數設定為 true,讓核心執行緒也退出。
須知:
以上關於執行緒數量的計算並沒有考慮 CPU 的情況。
若結合 CPU 的情況,比如,當執行緒數量達到 50 時,CPU 達到 100%,則將 maxPoolSize 設定為 60 也不合適,此時若系統負載長時間維持在每秒 1000個 任務,則超出執行緒池處理能力,應設法降低每個任務的處理時間 ( tasktime )。
在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,(除非呼叫了 prestartAllCoreThreads () 或者 prestartCoreThread () 方法,從這 2 個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立 corePoolSize 個執行緒或者一個執行緒)。
執行緒池監控
線上程池中,執行緒池的監控也是很重要的一個點。
我們經常會注意的點有當前的排隊執行緒池數,當前的活動執行緒數,執行完成執行緒數,匯流排程數。
private static ExecutorService executor = new ThreadPoolExecutor(50, 100, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(100000));
public static void main(String[] args) throws Exception {
for (int i = 0; i < 100000; i++) {
executor.execute(() -> {
System.out.print(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
ThreadPoolExecutor tpe = ((ThreadPoolExecutor) executor);
while (true) {
System.out.println();
int queueSize = tpe.getQueue().size();
System.out.println("當前排隊執行緒數:" + queueSize);
int activeCount = tpe.getActiveCount();
System.out.println("當前活動執行緒數:" + activeCount);
long completedTaskCount = tpe.getCompletedTaskCount();
System.out.println("執行完成執行緒數:" + completedTaskCount);
long taskCount = tpe.getTaskCount();
System.out.println("匯流排程數:" + taskCount);
Thread.sleep(3000);
}
}