跟著阿里p7一起學java高併發 - 第18天:玩轉java執行緒池,這一篇就夠了
java中的執行緒池,這一篇就夠了
java高併發系列第18篇文章。
本文主要內容
- 什麼是執行緒池
- 執行緒池實現原理
- 執行緒池中常見的各種佇列
- 自定義執行緒建立的工廠
- 常見的飽和策略
- 自定義飽和策略
- 執行緒池中兩種關閉方法有何不同
- 擴充套件執行緒池
- 合理地配置執行緒池
- 執行緒池中執行緒數量的配置
什麼是執行緒池
大家用jdbc操作過資料庫應該知道,操作資料庫需要和資料庫建立連線,拿到連線之後才能操作資料庫,用完之後銷燬。資料庫連線的建立和銷燬其實是比較耗時的,真正和業務相關的操作耗時是比較短的。每個資料庫操作之前都需要建立連線,為了提升系統性能,後來出現了資料庫連線池,系統啟動的時候,先建立很多連線放在池子裡面,使用的時候,直接從連線池中獲取一個,使用完畢之後返回到池子裡面,繼續給其他需要者使用,這其中就省去建立連線的時間,從而提升了系統整體的效能。
執行緒池和資料庫連線池的原理也差不多,建立執行緒去處理業務,可能建立執行緒的時間比處理業務的時間還長一些,如果系統能夠提前為我們建立好執行緒,我們需要的時候直接拿來使用,用完之後不是直接將其關閉,而是將其返回到執行緒中中,給其他需要這使用,這樣直接節省了建立和銷燬的時間,提升了系統的效能。
簡單的說,在使用了執行緒池之後,建立執行緒變成了從執行緒池中獲取一個空閒的執行緒,然後使用,關閉執行緒變成了將執行緒歸還到執行緒池。
執行緒池實現原理
當向執行緒池提交一個任務之後,執行緒池的處理流程如下:
- 判斷是否達到核心執行緒數,若未達到,則直接建立新的執行緒處理當前傳入的任務,否則進入下個流程
- 執行緒池中的工作佇列是否已滿,若未滿,則將任務丟入工作佇列中先存著等待處理,否則進入下個流程
- 是否達到最大執行緒數,若未達到,則建立新的執行緒處理當前傳入的任務,否則交給執行緒池中的飽和策略進行處理。
流程如下圖:
舉個例子,加深理解:
咱們作為開發者,上面都有開發主管,主管下面帶領幾個小弟幹活,CTO給主管授權說,你可以招聘5個小弟幹活,新來任務,如果小弟還不到吳哥,立即去招聘一個來幹這個新來的任務,當5個小弟都招來了,再來任務之後,將任務記錄到一個表格中,表格中最多記錄100個,小弟們會主動去表格中獲取任務執行,如果5個小弟都在幹活,並且表格中也記錄滿了,那你可以將小弟擴充到20個,如果20個小弟都在幹活,並且存放任務的表也滿了,產品經理再來任務後,是直接拒絕,還是讓產品自己幹,這個由你自己決定,小弟們都盡心盡力在幹活,任務都被處理完了,突然公司業績下滑,幾個員工沒事幹,打醬油,為了節約成本,CTO主管把小弟控制到5人,其他15個人直接被幹掉了。所以作為小弟們,別讓自己閒著,多幹活。
原理:先找幾個人幹活,大家都忙於幹活,任務太多可以排期,排期的任務太多了,再招一些人來幹活,最後幹活的和排期都達到上層領導要求的上限了,那需要採取一些其他策略進行處理了。對於長時間不幹活的人,考慮將其開掉,節約資源和成本。
java中的執行緒池
jdk中提供了執行緒池的具體實現,實現類是:java.util.concurrent.ThreadPoolExecutor
,主要構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心執行緒大小,當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使有其他空閒執行緒可以處理任務也會創新執行緒,等到工作的執行緒數大於核心執行緒數時就不會在建立了。如果呼叫了執行緒池的prestartAllCoreThreads
方法,執行緒池會提前把核心執行緒都創造好,並啟動
maximumPoolSize:執行緒池允許建立的最大執行緒數。如果佇列滿了,並且以建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。如果我們使用了無界佇列,那麼所有的任務會加入佇列,這個引數就沒有什麼效果了
keepAliveTime:執行緒池的工作執行緒空閒後,保持存活的時間。如果沒有任務處理了,有些執行緒會空閒,空閒的時間超過了這個值,會被回收掉。如果任務很多,並且每個任務的執行時間比較短,避免執行緒重複建立和回收,可以調大這個時間,提高執行緒的利用率
unit:keepAliveTIme的時間單位,可以選擇的單位有天、小時、分鐘、毫秒、微妙、千分之一毫秒和納秒。型別是一個列舉java.util.concurrent.TimeUnit
,這個列舉也經常使用,有興趣的可以看一下其原始碼
workQueue:工作佇列,用於快取待處理任務的阻塞佇列,常見的有4種,本文後面有介紹
threadFactory:執行緒池中建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字
handler:飽和策略,當執行緒池無法處理新來的任務了,那麼需要提供一種策略處理提交的新任務,預設有4種策略,文章後面會提到
呼叫執行緒池的execute方法處理任務,執行execute方法的過程:
- 判斷執行緒池中執行的執行緒數是否小於corepoolsize,是:則建立新的執行緒來處理任務,否:執行下一步
- 試圖將任務新增到workQueue指定的佇列中,如果無法新增到佇列,進入下一步
- 判斷執行緒池中執行的執行緒數是否小於
maximumPoolSize
,是:則新增執行緒處理當前傳入的任務,否:將任務傳遞給handler
物件rejectedExecution
方法處理
執行緒池的使用步驟:
呼叫構造方法建立執行緒池
- 呼叫執行緒池的方法處理任務
關閉執行緒池
執行緒池使用的簡單示例
上一個簡單的示例,如下:
package com.itsoku.chat16;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 跟著阿里p7學併發,微信公眾號:javacode2018
*/
public class Demo1 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
int j = i;
String taskName = "任務" + j;
executor.execute(() -> {
//模擬任務內部處理耗時
try {
TimeUnit.SECONDS.sleep(j);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + taskName + "處理完畢");
});
}
//關閉執行緒池
executor.shutdown();
}
}
輸出:
pool-1-thread-1任務0處理完畢
pool-1-thread-2任務1處理完畢
pool-1-thread-3任務2處理完畢
pool-1-thread-1任務3處理完畢
pool-1-thread-2任務4處理完畢
pool-1-thread-3任務5處理完畢
pool-1-thread-1任務6處理完畢
pool-1-thread-2任務7處理完畢
pool-1-thread-3任務8處理完畢
pool-1-thread-1任務9處理完畢
執行緒池中常見5種工作佇列
任務太多的時候,工作佇列用於暫時快取待處理的任務,jdk中常見的5種阻塞佇列:
ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按照先進先出原則對元素進行排序
LinkedBlockingQueue:是一個基於連結串列結構的阻塞佇列,此佇列按照先進先出排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool
使用了這個佇列。
SynchronousQueue :一個不儲存元素的阻塞佇列,每個插入操作必須等到另外一個執行緒呼叫移除操作,否則插入操作一直處理阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool
使用這個佇列
PriorityBlockingQueue:優先順序佇列,進入佇列的元素按照優先順序會進行排序
前2種佇列相關示例就不說了,主要說一下後面2種佇列的使用示例。
SynchronousQueue佇列的執行緒池
package com.itsoku.chat16;
import java.util.concurrent.*;
/**
* 跟著阿里p7學併發,微信公眾號:javacode2018
*/
public class Demo2 {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 50; i++) {
int j = i;
String taskName = "任務" + j;
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "處理" + taskName);
//模擬任務內部處理耗時
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
pool-1-thread-1處理任務0
pool-1-thread-2處理任務1
pool-1-thread-3處理任務2
pool-1-thread-6處理任務5
pool-1-thread-7處理任務6
pool-1-thread-4處理任務3
pool-1-thread-5處理任務4
pool-1-thread-8處理任務7
pool-1-thread-9處理任務8
pool-1-thread-10處理任務9
pool-1-thread-11處理任務10
pool-1-thread-12處理任務11
pool-1-thread-13處理任務12
pool-1-thread-14處理任務13
pool-1-thread-15處理任務14
pool-1-thread-16處理任務15
pool-1-thread-17處理任務16
pool-1-thread-18處理任務17
pool-1-thread-19處理任務18
pool-1-thread-20處理任務19
pool-1-thread-21處理任務20
pool-1-thread-25處理任務24
pool-1-thread-24處理任務23
pool-1-thread-23處理任務22
pool-1-thread-22處理任務21
pool-1-thread-26處理任務25
pool-1-thread-27處理任務26
pool-1-thread-28處理任務27
pool-1-thread-30處理任務29
pool-1-thread-29處理任務28
pool-1-thread-31處理任務30
pool-1-thread-32處理任務31
pool-1-thread-33處理任務32
pool-1-thread-38處理任務37
pool-1-thread-35處理任務34
pool-1-thread-36處理任務35
pool-1-thread-41處理任務40
pool-1-thread-34處理任務33
pool-1-thread-39處理任務38
pool-1-thread-40處理任務39
pool-1-thread-37處理任務36
pool-1-thread-42處理任務41
pool-1-thread-43處理任務42
pool-1-thread-45處理任務44
pool-1-thread-46處理任務45
pool-1-thread-44處理任務43
pool-1-thread-47處理任務46
pool-1-thread-50處理任務49
pool-1-thread-48處理任務47
pool-1-thread-49處理任務48
程式碼中使用Executors.newCachedThreadPool()
建立執行緒池,看一下的原始碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從輸出中可以看出,系統建立了50個執行緒處理任務,程式碼中使用了SynchronousQueue
同步佇列,這種佇列比較特殊,放入元素必須要有另外一個執行緒去獲取這個元素,否則放入元素會失敗或者一直阻塞在那裡直到有執行緒取走,示例中任務處理休眠了指定的時間,導致已建立的工作執行緒都忙於處理任務,所以新來任務之後,將任務丟入同步佇列會失敗,丟入佇列失敗之後,會嘗試新建執行緒處理任務。使用上面的方式建立執行緒池需要注意,如果需要處理的任務比較耗時,會導致新來的任務都會建立新的執行緒進行處理,可能會導致建立非常多的執行緒,最終耗盡系統資源,觸發OOM。
PriorityBlockingQueue優先順序佇列的執行緒池
package com.itsoku.chat16;
import java.util.concurrent.*;
/**
* 跟著阿里p7學併發,微信公眾號:javacode2018
*/
public class Demo3 {
static class Task implements Runnable, Comparable<Task> {
private int i;
private String name;
public Task(int i, String name) {
this.i = i;
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "處理" + this.name);
}
@Override
public int compareTo(Task o) {
return Integer.compare(o.i, this.i);
}
}
public static void main(String[] args) {
ExecutorService executor = new ThreadPoolExecutor(1, 1,
60L, TimeUnit.SECONDS,
new PriorityBlockingQueue());
for (int i = 0; i < 10; i++) {
String taskName = "任務" + i;
executor.execute(new Task(i, taskName));
}
for (int i = 100; i >= 90; i--) {
String taskName = "任務" + i;
executor.execute(new Task(i, taskName));
}
executor.shutdown();
}
}
輸出:
pool-1-thread-1處理任務0
pool-1-thread-1處理任務100
pool-1-thread-1處理任務99
pool-1-thread-1處理任務98
pool-1-thread-1處理任務97
pool-1-thread-1處理任務96
pool-1-thread-1處理任務95
pool-1-thread-1處理任務94
pool-1-thread-1處理任務93
pool-1-thread-1處理任務92
pool-1-thread-1處理任務91
pool-1-thread-1處理任務90
pool-1-thread-1處理任務9
pool-1-thread-1處理任務8
pool-1-thread-1處理任務7
pool-1-thread-1處理任務6
pool-1-thread-1處理任務5
pool-1-thread-1處理任務4
pool-1-thread-1處理任務3
pool-1-thread-1處理任務2
pool-1-thread-1處理任務1
輸出中,除了第一個任務,其他任務按照優先順序高低按順序處理。原因在於:建立執行緒池的時候使用了優先順序佇列,進入佇列中的任務會進行排序,任務的先後順序由Task中的i變數決定。向PriorityBlockingQueue
加入元素的時候,內部會呼叫程式碼中Task的compareTo
方法決定元素的先後順序。
自定義建立執行緒的工廠
給執行緒池中執行緒起一個有意義的名字,在系統出現問題的時候,通過執行緒堆疊資訊可以更容易發現系統中問題所在。自定義建立工廠需要實現java.util.concurrent.ThreadFactory
介面中的Thread newThread(Runnable r)
方法,引數為傳入的任務,需要返回一個工作執行緒。
示例程式碼:
package com.itsoku.chat16;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <b>description</b>: <br>
* <b>time</b>:2019/7/28 21:01 <br>
* <b>author</b>:ready [email protected]
*/
public class Demo4 {
static AtomicInteger threadNum = new AtomicInteger(1);
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10), r -> {
Thread thread = new Thread(r);
thread.setName("自定義執行緒-" + threadNum.getAndIncrement());
return thread;
});
for (int i = 0; i < 5; i++) {
String taskName = "任務-" + i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "處理" + taskName);
});
}
executor.shutdown();
}
}
輸出:
自定義執行緒-1處理任務-0
自定義執行緒-3處理任務-2
自定義執行緒-2處理任務-1
自定義執行緒-4處理任務-3
自定義執行緒-5處理任務-4
程式碼中在任務中輸出了當前執行緒的名稱,可以看到是我們自定義的名稱。
通過jstack檢視執行緒的堆疊資訊,也可以看到我們自定義的名稱,我們可以將程式碼中executor.shutdown();
先給註釋掉讓程式先不退出,然後通過jstack檢視,如下:
4種常見飽和策略
當執行緒池中佇列已滿,並且執行緒池已達到最大執行緒數,執行緒池會將任務傳遞給飽和策略進行處理。這些策略都實現了RejectedExecutionHandler
介面。介面中有個方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
引數說明:
r:需要執行的任務
executor:當前執行緒池物件
JDK中提供了4種常見的飽和策略:
AbortPolicy:直接丟擲異常
CallerRunsPolicy:在當前呼叫者的執行緒中執行任務,即隨丟來的任務,由他自己去處理
DiscardOldestPolicy:丟棄佇列中最老的一個任務,即丟棄佇列頭部的一個任務,然後執行當前傳入的任務
DiscardPolicy:不處理,直接丟棄掉,方法內部為空
自定義飽和策略
需要實現RejectedExecutionHandler
介面。任務無法處理的時候,我們想記錄一下日誌,我們需要自定義一個飽和策略,示例程式碼:
package com.itsoku.chat16;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 跟著阿里p7學併發,微信公眾號:javacode2018
*/
public class Demo5 {
static class Task implements Runnable {
String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "處理" + this.name);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task{" +
"name='" + name + '\'' +
'}';
}
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
1,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
(r, executors) -> {
//自定義飽和策略
//記錄一下無法處理的任務
System.out.println("無法處理的任務:" + r.toString());
});
for (int i = 0; i < 5; i++) {
executor.execute(new Task("任務-" + i));
}
executor.shutdown();
}
}
輸出:
無法處理的任務:Task{name='任務-2'}
無法處理的任務:Task{name='任務-3'}
pool-1-thread-1處理任務-0
無法處理的任務:Task{name='任務-4'}
pool-1-thread-1處理任務-1
輸出結果中可以看到有3個任務進入了飽和策略中,記錄了任務的日誌,對於無法處理多工,我們最好能夠記錄一下,讓開發人員能夠知道。任務進入了飽和策略,說明執行緒池的配置可能不是太合理,或者機器的效能有限,需要做一些優化調整。
執行緒池中的2個關閉方法
執行緒池提供了2個關閉方法:shutdown
和shutdownNow
,當呼叫者兩個方法之後,執行緒池會遍歷內部的工作執行緒,然後呼叫每個工作執行緒的interrrupt方法給執行緒傳送中斷訊號,內部如果無法響應中斷訊號的可能永遠無法終止,所以如果內部有無線迴圈的,最好在迴圈內部檢測一下執行緒的中斷訊號,合理的退出。呼叫者兩個方法中任意一個,執行緒池的isShutdown
方法就會返回true,當所有的任務執行緒都關閉之後,才表示執行緒池關閉成功,這時呼叫isTerminaed
方法會返回true。
呼叫shutdown
方法之後,執行緒池將不再介面新任務,內部會將所有已提交的任務處理完畢,處理完畢之後,工作執行緒自動退出。
而呼叫shutdownNow
方法後,執行緒池會將還未處理的(在隊裡等待處理的任務)任務移除,將正在處理中的處理完畢之後,工作執行緒自動退出。
至於呼叫哪個方法來關閉執行緒,應該由提交到執行緒池的任務特性決定,多數情況下呼叫shutdown
方法來關閉執行緒池,如果任務不一定要執行完,則可以呼叫shutdownNow
方法。
擴充套件執行緒池
雖然jdk提供了ThreadPoolExecutor
這個高效能執行緒池,但是如果我們自己想在這個執行緒池上面做一些擴充套件,比如,監控每個任務執行的開始時間,結束時間,或者一些其他自定義的功能,我們應該怎麼辦?
這個jdk已經幫我們想到了,ThreadPoolExecutor
內部提供了幾個方法beforeExecute
、afterExecute
、terminated
,可以由開發人員自己去這些方法。看一下執行緒池內部的原始碼:
try {
beforeExecute(wt, task);//任務執行之前呼叫的方法
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);//任務執行完畢之後呼叫的方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
beforeExecute:任務執行之前呼叫的方法,有2個引數,第1個引數是執行任務的執行緒,第2個引數是任務
protected void beforeExecute(Thread t, Runnable r) { }
afterExecute:任務執行完成之後呼叫的方法,2個引數,第1個引數表示任務,第2個引數表示任務執行時的異常資訊,如果無異常,第二個引數為null
protected void afterExecute(Runnable r, Throwable t) { }
terminated:執行緒池最終關閉之後呼叫的方法。所有的工作執行緒都退出了,最終執行緒池會退出,退出時呼叫該方法
示例程式碼:
package com.itsoku.chat16;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 跟著阿里p7學併發,微信公眾號:javacode2018
*/
public class Demo6 {
static class Task implements Runnable {
String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "處理" + this.name);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task{" +
"name='" + name + '\'' +
'}';
}
}
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
10,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
(r, executors) -> {
//自定義飽和策略
//記錄一下無法處理的任務
System.out.println("無法處理的任務:" + r.toString());
}) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(System.currentTimeMillis() + "," + t.getName() + ",開始執行任務:" + r.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",任務:" + r.toString() + ",執行完畢!");
}
@Override
protected void terminated() {
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",關閉執行緒池!");
}
};
for (int i = 0; i < 10; i++) {
executor.execute(new Task("任務-" + i));
}
TimeUnit.SECONDS.sleep(1);
executor.shutdown();
}
}
輸出:
1564324574847,pool-1-thread-1,開始執行任務:Task{name='任務-0'}
1564324574850,pool-1-thread-3,開始執行任務:Task{name='任務-2'}
pool-1-thread-3處理任務-2
1564324574849,pool-1-thread-2,開始執行任務:Task{name='任務-1'}
pool-1-thread-2處理任務-1
1564324574848,pool-1-thread-5,開始執行任務:Task{name='任務-4'}
pool-1-thread-5處理任務-4
1564324574848,pool-1-thread-4,開始執行任務:Task{name='任務-3'}
pool-1-thread-4處理任務-3
1564324574850,pool-1-thread-7,開始執行任務:Task{name='任務-6'}
pool-1-thread-7處理任務-6
1564324574850,pool-1-thread-6,開始執行任務:Task{name='任務-5'}
1564324574851,pool-1-thread-8,開始執行任務:Task{name='任務-7'}
pool-1-thread-8處理任務-7
pool-1-thread-1處理任務-0
pool-1-thread-6處理任務-5
1564324574851,pool-1-thread-10,開始執行任務:Task{name='任務-9'}
pool-1-thread-10處理任務-9
1564324574852,pool-1-thread-9,開始執行任務:Task{name='任務-8'}
pool-1-thread-9處理任務-8
1564324576851,pool-1-thread-2,任務:Task{name='任務-1'},執行完畢!
1564324576851,pool-1-thread-3,任務:Task{name='任務-2'},執行完畢!
1564324576852,pool-1-thread-1,任務:Task{name='任務-0'},執行完畢!
1564324576852,pool-1-thread-4,任務:Task{name='任務-3'},執行完畢!
1564324576852,pool-1-thread-8,任務:Task{name='任務-7'},執行完畢!
1564324576852,pool-1-thread-7,任務:Task{name='任務-6'},執行完畢!
1564324576852,pool-1-thread-5,任務:Task{name='任務-4'},執行完畢!
1564324576853,pool-1-thread-6,任務:Task{name='任務-5'},執行完畢!
1564324576853,pool-1-thread-10,任務:Task{name='任務-9'},執行完畢!
1564324576853,pool-1-thread-9,任務:Task{name='任務-8'},執行完畢!
1564324576853,pool-1-thread-9,關閉執行緒池!
從輸出結果中可以看到,每個需要執行的任務列印了3行日誌,執行前由執行緒池的beforeExecute
列印,執行時會呼叫任務的run
方法,任務執行完畢之後,會呼叫執行緒池的afterExecute
方法,從每個任務的首尾2條日誌中可以看到每個任務耗時2秒左右。執行緒池最終關閉之後呼叫了terminated
方法。
合理地配置執行緒池
要想合理的配置執行緒池,需要先分析任務的特性,可以衝一下幾個角度分析:
任務的性質:CPU密集型任務、IO密集型任務和混合型任務
任務的優先順序:高、中、低
任務的執行時間:長、中、短
任務的依賴性:是否依賴其他的系統資源,如資料庫連線。
性質不同任務可以用不同規模的執行緒池分開處理。CPU密集型任務應該儘可能小的執行緒,如配置cpu數量+1個執行緒的執行緒池。由於IO密集型任務並不是一直在執行任務,不能讓cpu閒著,則應配置儘可能多的執行緒,如:cup數量*2。混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這2個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於序列執行的吞吐量。可以通過Runtime.getRuntime().availableProcessors()
方法獲取cpu數量。優先順序不同任務可以對執行緒池採用優先順序佇列來處理,讓優先順序高的先執行。
使用佇列的時候建議使用有界佇列,有界佇列增加了系統的穩定性,如果採用無解佇列,任務太多的時候可能導致系統OOM,直接讓系統宕機。
執行緒池中執行緒數量的配置
執行緒池彙總執行緒大小對系統的效能有一定的影響,我們的目標是希望系統能夠發揮最好的效能,過多或者過小的執行緒數量無法有訊息的使用機器的效能。咋Java Concurrency inPractice書中給出了估算執行緒池大小的公式:
Ncpu = CUP的數量
Ucpu = 目標CPU的使用率,0<=Ucpu<=1
W/C = 等待時間與計算時間的比例
為儲存處理器達到期望的使用率,最有的執行緒池的大小等於:
Nthreads = Ncpu × Ucpu × (1+W/C)
一些使用建議
在《阿里巴巴java開發手冊》中指出了執行緒資源必須通過執行緒池提供,不允許在應用中自行顯示的建立執行緒,這樣一方面是執行緒的建立更加規範,可以合理控制開闢執行緒的數量;另一方面執行緒的細節管理交給執行緒池處理,優化了資源的開銷。而執行緒池不允許使用Executors去建立,而要通過ThreadPoolExecutor方式,這一方面是由於jdk中Executor框架雖然提供瞭如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等建立執行緒池的方法,但都有其侷限性,不夠靈活;另外由於前面幾種方法內部也是通過ThreadPoolExecutor方式實現,使用ThreadPoolExecutor有助於大家明確執行緒池的執行規則,建立符合自己的業務場景需要的執行緒池,避免資源耗盡的風險。
java高併發系列
- java高併發系列 - 第1天:必須知道的幾個概念
- java高併發系列 - 第2天:併發級別
- java高併發系列 - 第3天:有關並行的兩個重要定律
- java高併發系列 - 第4天:JMM相關的一些概念
- java高併發系列 - 第5天:深入理解程序和執行緒
- java高併發系列 - 第6天:執行緒的基本操作
- java高併發系列 - 第7天:volatile與Java記憶體模型
- java高併發系列 - 第8天:執行緒組
- java高併發系列 - 第9天:使用者執行緒和守護執行緒
- java高併發系列 - 第10天:執行緒安全和synchronized關鍵字
- java高併發系列 - 第11天:執行緒中斷的幾種方式
- java高併發系列 - 第12天JUC:ReentrantLock重入鎖
- java高併發系列 - 第13天:JUC中的Condition物件
- java高併發系列 - 第14天:JUC中的LockSupport工具類,必備技能
- java高併發系列 - 第15天:JUC中的Semaphore(訊號量)
- java高併發系列 - 第16天:JUC中等待多執行緒完成的工具類CountDownLatch,必備技能
- java高併發系列 - 第17天:JUC中的迴圈柵欄CyclicBarrier的6種使用場景
高併發系列連載中,感興趣的加我微信itsoku,一起交流,關注公眾號:路人甲Java,每天獲取最新連載文章!
相關推薦
跟著阿里p7一起學java高併發 - 第18天:玩轉java執行緒池,這一篇就夠了
java中的執行緒池,這一篇就夠了 java高併發系列第18篇文章。 本文主要內容 什麼是執行緒池 執行緒池實現原理 執行緒池中常見的各種佇列 自定義執行緒建立的工廠 常見的飽和策略 自定義飽和策略 執行緒池中兩種關閉方法有何不同 擴充套件執行緒池 合理地配置執行緒池 執行緒池中執行緒數量的配置 什麼是執
【萬字圖文-原創】 | 學會Java中的執行緒池,這一篇也許就夠了!
![](https://img2020.cnblogs.com/blog/799093/202005/799093-20200524082508581-233911575.png) ### 碎碎念 關於JDK原始碼相關的文章這已經是第四篇了,原創不易,粉絲從幾十人到昨天的`666`人,真的很感謝之前幫我
【高併發】什麼是ForkJoin?看這一篇就夠了!
寫在前面 在JDK中,提供了這樣一種功能:它能夠將複雜的邏輯拆分成一個個簡單的邏輯來並行執行,待每個並行執行的邏輯執行完成後,再將各個結果進行彙總,得出最終的結果資料。有點像Hadoop中的MapReduce。 ForkJoin是由JDK1.7之後提供的多執行緒併發處理框架。ForkJoin框架的基本思想是
Java中的多線程你只要看這一篇就夠了
== 討論 cin 線程池。 locking nth lis dset tro 引 如果對什麽是線程、什麽是進程仍存有疑惑,請先Google之,因為這兩個概念不在本文的範圍之內。 用多線程只有一個目的,那就是更好的利用cpu的資源,因為所有的多線程代碼都可以用單線程來實現。
轉:Java中的多線程你只要看這一篇就夠了
無法 線程不安全 str his ace oat 情況下 containe live 如果對什麽是線程、什麽是進程仍存有疑惑,請先Google之,因為這兩個概念不在本文的範圍之內。 用多線程只有一個目的,那就是更好的利用cpu的資源,因為所有的多線程代碼都可以用單線程來實現
Java中的多執行緒你只要看這一篇就夠了(轉)
引 如果對什麼是執行緒、什麼是程序仍存有疑惑,請先Google之,因為這兩個概念不在本文的範圍之內。 用多執行緒只有一個目的,那就是更好的利用cpu的資源,因為所有的多執行緒程式碼都可以用單執行緒來實現。說這個話其實只有一半對,因為反應“多角色”的程式程式碼,最起碼每個角色要給他一個執行緒吧,否
學爬蟲利器XPath,看這一篇就夠了
XPath的使用 XPath,全稱 XML Path Language,即 XML 路徑語言,它是一門在XML文件中查詢資訊的語言。XPath 最初設計是用來搜尋XML文件的,但是它同樣適用於 HTML 文件的搜尋。 所以在做爬蟲時,我們完全可以使用 XPath 來做相應的資訊抽取,本節我們
Java反射機制你只要看這一篇就夠了
今天來總結一下Java反射機制,在此之前,回顧下java程式的編譯執行過程,分為三個階段:原始碼(.java檔案)進過編譯生成位元組碼檔案(.class檔案),然後jvm載入位元組碼檔案執行程式(runtime)。 前兩個步驟(編譯階段)是在硬碟上完成的,後一個步驟(執行階段)是在記憶體中完成的
有關Java反射的使用看這一篇就夠了
1. 簡介 本篇文章不探討反射的實現機制或者說實現原理,僅僅從使用的角度去講解我們常用的一些API介面,方便自己以後需要使用時信手拈來,同時也方便廣大博友能夠快速瞭解API的使用。 什麼是反射? 反射是java語言的一個特性,它允許一個java的類獲取他所有的成員變數
百萬併發下的Nginx優化,看這一篇就夠了!
本文作者主要分享在 Nginx 效能方面的實踐經驗,希望能給大家帶來一些系統化思考,幫助大家更有效地去做 Nginx。 優化方法論 我重點分享如下兩個問題: 保持併發連線數,怎麼樣做到記憶體有效使用。 在高併發的同時保持高吞吐量的重要要點。 實現層面主要是三方面優化,主要聚焦
Java中的多執行緒你只要看這一篇就夠了
/** * 生產者生產出來的產品交給店員 */ public synchronized void produce() { if(this.product >= MAX_PRODUCT) { try {
Java 中的多執行緒你只要看這一篇就夠了
如果對什麼是執行緒、什麼是程序仍存有疑惑,請先Google之,因為這兩個概念不在本文的範圍之內。 用多執行緒只有一個目的,那就是更好的利用cpu的資源,因為所有的多執行緒程式碼都可以用單執行緒來實現。說這個話其實只有一半對,因為反應“多角色”的程式程式碼,最起碼每個角色要給他一個
Java 中的單例模式,看完這一篇就夠了
單例模式是最常見的一個模式,在Java中單例模式被大量的使用。這同樣也是我在面試時最喜歡提到的一個面試問題,然後在面試者回答後可以進一步挖掘其細節,這不僅檢查了關於單例模式的相關知識,同時也檢查了面試者的編碼水平、多執行緒方面的知識,這些在實際的工作中非常重要。 在這個
Java 中的多線程你只要看這一篇就夠了
並發 進入 事務 相同 人工 出了 研究 class pool 引 如果對什麽是線程、什麽是進程仍存有疑惑,請先Google之,因為這兩個概念不在本文的範圍之內。 用多線程只有一個目的,那就是更好的利用cpu的資源,因為所有的多線程代碼都可以用單線程來實現。說這個話其實只有
Java NIO?看這一篇就夠了!
現在使用NIO的場景越來越多,很多網上的技術框架或多或少的使用NIO技術,譬如Tomcat,Jetty。學習和掌握NIO技術已經不是一個JAVA攻城獅的加分技能,而是一個必備技能。在前面2篇文章《什麼是Zero-Copy?》和《NIO相關基礎篇》中我們學習了NIO的相關理論知
執行緒(本文轉載而來) Java中的多執行緒你只要看這一篇就夠了
Java中的多執行緒你只要看這一篇就夠了 引 如果對什麼是執行緒、什麼是程序仍存有疑惑,請先Google之,因為這兩個概念不在本文的範圍之內。 用多執行緒只有一個目的,那就是更好的利用cpu的資源,因為所有的多執行緒程式碼都可以用單執行緒來實現。說
容器中 Java 應用程式的記憶體和 CPU 如何分配?看這一篇就夠了!
出品丨Docker公司(ID:docker-cn)編譯丨小東每週一、三、五,與您不見不散! 隨著2018年的結束,我們將回顧排名前五的最受讀者歡迎的文章。今天分享的第一篇文章,將幫助那些在容器中執行 Java 虛擬機器(JVM)時遇到記憶體和 CPU 大小調整/使用困難的人,本文將解釋如何在 D
Java開發必須熟悉的Linux命令看這一篇就夠了。
身為一個Java開發人員,這些常用的Linux命令必須掌握。即使平時開發過程中沒有使用Linux(Unix)或者mac系統,也需要熟練掌握Linux命令。因為很多伺服器上都是Linux系統。所以,要和伺服器機器互動,就要通過shell命令、身為伺服器後端經驗豐富
【Java面試題系列】:Java基礎知識面試題,看這一篇就夠了(持續更新)
文中面試題從茫茫網海中精心篩選,如有錯誤,歡迎指正! 1.前言 參加過社招的同學都瞭解,進入一家公司面試開發崗位時,填寫完個人資訊後,一般都會讓先做一份筆試題,然後公司會根據筆試題的回答結果,確定要不要繼續此次面試,如果答的不好,有些公司可能會直接說“技術經理或者總監在忙,你先回去等通知吧”,有些公司
【Java面試題系列】:Java基礎知識面試題,看這一篇就夠了
路徑 拼接 i++ misc min 中新 dem 總結 內容 文中面試題從茫茫網海中精心篩選,如有錯誤,歡迎指正! 1.前言 參加過社招的同學都了解,進入一家公司面試開發崗位時,填寫完個人信息後,一般都會讓先做一份筆試題,然後公司會根據筆試題的回答結果,確定要不要繼續此