一時技癢,擼了個動態執行緒池,原始碼放Github了
阿新 • • 發佈:2020-06-17
## 闡述背景
執行緒池在日常工作中用的還挺多,當需要非同步,批量處理一些任務的時候我們會定義一個執行緒池來處理。
在使用執行緒池的過程中有一些問題,下面簡單介紹下之前遇到的一些問題。
**場景一**:實現一些批量處理資料的功能,剛開始執行緒池的核心執行緒數設的比較小,然後想調整下,只能改完後重啟應用。
**場景二**:有一個任務處理的應用,會接收 MQ 的訊息進行任務的處理,執行緒池的佇列也允許快取一定數量的任務。當任務處理的很慢的時候,想看看到底有多少沒有處理完不是很方便。當時為了快速方便,就直接啟動了一個執行緒去迴圈列印執行緒池佇列的大小。
正好之前在我公眾號有轉發過美團的一篇執行緒池應用的文章([https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w](https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w)),覺得他們的思路非常好,就是沒有開放原始碼,所以自己就抽時間在我的開源專案 Kitty 中增加了一個動態執行緒池的元件,支援了 Cat 監控,動態變更核心引數,任務堆積告警等。今天就給大家分享一下實現的方式。
**專案原始碼地址**:[https://github.com/yinjihuan/kitty](https://github.com/yinjihuan/kitty "https://github.com/yinjihuan/kitty")
## 使用方式
### 新增依賴
依賴執行緒池的元件,目前 Kitty 未釋出,需要自己下載原始碼 install 本地或者私有倉庫。
```java
```
### 新增配置
然後在 Nacos 配置執行緒池的資訊,我的這個整合了 Nacos。推薦一個應用建立一個單獨的執行緒池配置檔案,比如我們這個叫 dataId 為 kitty-cloud-thread-pool.properties,group 為 BIZ_GROUP。
內容如下:
```plain
kitty.threadpools.nacosDataId=kitty-cloud-thread-pool.properties
kitty.threadpools.nacosGroup=BIZ_GROUP
kitty.threadpools.accessToken=ae6eb1e9e6964d686d2f2e8127d0ce5b31097ba23deee6e4f833bc0a77d5b71d
kitty.threadpools.secret=SEC6ec6e31d1aa1bdb2f7fd5eb5934504ce09b65f6bdc398d00ba73a9857372de00
kitty.threadpools.owner=尹吉歡
kitty.threadpools.executors[0].threadPoolName=TestThreadPoolExecutor
kitty.threadpools.executors[0].corePoolSize=4
kitty.threadpools.executors[0].maximumPoolSize=4
kitty.threadpools.executors[0].queueCapacity=5
kitty.threadpools.executors[0].queueCapacityThreshold=5
kitty.threadpools.executors[1].threadPoolName=TestThreadPoolExecutor2
kitty.threadpools.executors[1].corePoolSize=2
kitty.threadpools.executors[1].maximumPoolSize=4
```
**nacosDataId,nacosGroup**
監聽配置修改的時候需要知道監聽哪個 DataId,值就是當前配置的 DataId。
**accessToken,secret**
釘釘機器人的驗證資訊,用於告警。
**owner**
這個應用的負責人,告警的訊息中會顯示。
**threadPoolName**
執行緒池的名稱,使用的時候需要關注。
剩下的配置就不一一介紹了,跟執行緒池內部的引數一致,還有一些可以檢視原始碼得知。
### 注入使用
```plain
@Autowired
private DynamicThreadPoolManager dynamicThreadPoolManager;
dynamicThreadPoolManager.getThreadPoolExecutor("TestThreadPoolExecutor").execute(() -> {
log.info("執行緒池的使用");
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "getArticle");
```
通過 DynamicThreadPoolManager 的 getThreadPoolExecutor 方法獲取執行緒池物件,然後傳入 Runnable,Callable 等。第二個引數是這個任務的名稱,之所以要擴充套件一個引數是因為如果任務沒有標識,那麼無法區分任務。
這個執行緒池元件預設集成了 Cat 打點,設定了名稱可以在 Cat 上檢視這個任務相關的監控資料。
## 擴充套件功能
### 任務執行情況監控
在 Cat 的 Transaction 報表中會以執行緒池的名稱為型別顯示。
![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130049725-2134887428.png)
詳情中會以任務的名稱顯示。
![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130100499-38200194.png)
### 核心引數動態修改
核心引數目前只支援 corePoolSize,maximumPoolSize,queueCapacity(佇列型別為 LinkedBlockingDeque 才可以修改),rejectedExecutionType,keepAliveTime,unit 這些引數的修改。
一般 corePoolSize,maximumPoolSize,queueCapacity 是最常要動態改變的。
需要改動的話直接在 Nacos 中將對應的配置值修改即可,客戶端會監聽配置的修改,然後同步修改先執行緒池的引數。
### 佇列容量告警
queueCapacityThreshold 是佇列容量告警的閥值,如果佇列中的任務數量超過了 queueCapacityThreshold 就會告警。
![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130114750-1573450228.png)
### 拒絕次數告警
當佇列容量滿了後,新進來的任務會根據使用者設定的拒絕策略去選擇對應的處理方式。如果是採用 AbortPolicy 策略,也會進行告警。相當於消費者已經超負荷了。
![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130124578-741847936.png)
### 執行緒池執行情況
底層對接了 Cat,所以將執行緒的執行資料上報給了 Cat。我們可以在 Cat 中檢視這些資訊。
![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130133702-1941792723.png)
如果你想在自己的平臺去展示,我這邊暴露了/actuator/thread-pool 端點,你可以自行拉取資料。
```plain
{
threadPools: [{
threadPoolName: "TestThreadPoolExecutor",
activeCount: 0,
keepAliveTime: 0,
largestPoolSize: 4,
fair: false,
queueCapacity: 5,
queueCapacityThreshold: 2,
rejectCount: 0,
waitTaskCount: 0,
taskCount: 5,
unit: "MILLISECONDS",
rejectedExecutionType: "AbortPolicy",
corePoolSize: 4,
queueType: "LinkedBlockingQueue",
completedTaskCount: 5,
maximumPoolSize: 4
}, {
threadPoolName: "TestThreadPoolExecutor2",
activeCount: 0,
keepAliveTime: 0,
largestPoolSize: 0,
fair: false,
queueCapacity: 2147483647,
queueCapacityThreshold: 2147483647,
rejectCount: 0,
waitTaskCount: 0,
taskCount: 0,
unit: "MILLISECONDS",
rejectedExecutionType: "AbortPolicy",
corePoolSize: 2,
queueType: "LinkedBlockingQueue",
completedTaskCount: 0,
maximumPoolSize: 4
}]
}
```
### 自定義拒絕策略
平時我們使用程式碼建立執行緒池可以自定義拒絕策略,在構造執行緒池物件的時候傳入即可。這裡由於建立執行緒池都被封裝好了,我們只能在 Nacos 配置拒絕策略的名稱來使用對應的策略。預設是可以配置 JDK 自帶的 CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy 這四種。
如果你想自定義的話也是支援的,定義方式跟以前一樣,如下:
```plain
@Slf4j
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("進來了。。。。。。。。。");
}
}
```
要讓這個策略生效的話使用的是 SPI 的方式,需要在 resources 下面建立一個 META-INF 的資料夾,然後建立一個 services 的資料夾,再建立一個 java.util.concurrent.RejectedExecutionHandler 的檔案,內容為你定義的類全路徑。
![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130145126-1021996058.png)
### 自定義告警方式
預設是內部集成了釘釘機器人的告警方式,如果你不想用也可以將其關閉。或者將告警資訊對接到你的監控平臺去。
如果沒有告警平臺也可以在專案中實現新的告警方式,比如簡訊等。
只需要實現 ThreadPoolAlarmNotify 這個類即可。
```plain
/**
* 自定義簡訊告警通知
*
* @作者 尹吉歡
* @個人微信 jihuan900
* @微信公眾號 猿天地
* @GitHub https://github.com/yinjihuan
* @作者介紹 http://cxytiandi.com/about
* @時間 2020-05-27 22:26
*/
@Slf4j
@Component
public class ThreadPoolSmsAlarmNotify implements ThreadPoolAlarmNotify {
@Override
public void alarmNotify(AlarmMessage alarmMessage) {
log.info(alarmMessage.toString());
}
}
```
## 程式碼實現
具體的就不講的很細了,原始碼在[https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool](https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool "https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool"),大家自己去看,並不複雜。
### 建立執行緒池
根據配置建立執行緒池,ThreadPoolExecutor 是自定義的,因為需要做 Cat 埋點。
```plain
/**
* 建立執行緒池
* @param threadPoolProperties
*/
private void createThreadPoolExecutor(DynamicThreadPoolProperties threadPoolProperties) {
threadPoolProperties.getExecutors().forEach(executor -> {
KittyThreadPoolExecutor threadPoolExecutor = new KittyThreadPoolExecutor(
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getKeepAliveTime(),
executor.getUnit(),
getBlockingQueue(executor.getQueueType(), executor.getQueueCapacity(), executor.isFair()),
new KittyThreadFactory(executor.getThreadPoolName()),
getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()), executor.getThreadPoolName());
threadPoolExecutorMap.put(executor.getThreadPoolName(), threadPoolExecutor);
});
}
```
### 重新整理執行緒池
首先需要監聽 Nacos 的修改。
```plain
/**
* 監聽配置修改,spring-cloud-alibaba 2.1.0版本不支援@NacosConfigListener的監聽
*/
public void initConfigUpdateListener(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
ConfigService configService = nacosConfigProperties.configServiceInstance();
try {
configService.addListener(dynamicThreadPoolProperties.getNacosDataId(), dynamicThreadPoolProperties.getNacosGroup(), new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
new Thread(() -> refreshThreadPoolExecutor()).start();
log.info("執行緒池配置有變化,重新整理完成");
}
});
} catch (NacosException e) {
log.error("Nacos配置監聽異常", e);
}
}
```
然後再重新整理執行緒池的引數資訊,由於監聽事件觸發的時候,這個時候配置其實還沒重新整理,所以我就等待了 1 秒鐘,讓配置完成重新整理然後直接從配置類取值。
雖然有點挫還是可以用,其實更好的方式是解析 receiveConfigInfo 那個 configInfo,configInfo 就是改變之後的整個配置內容。因為不太好解析成屬性檔案,就沒做,後面再改吧。
```plain
/**
* 重新整理執行緒池
*/
private void refreshThreadPoolExecutor() {
try {
// 等待配置重新整理完成
Thread.sleep(1000);
} catch (InterruptedException e) {
}
dynamicThreadPoolProperties.getExecutors().forEach(executor -> {
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(executor.getThreadPoolName());
threadPoolExecutor.setCorePoolSize(executor.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(executor.getMaximumPoolSize());
threadPoolExecutor.setKeepAliveTime(executor.getKeepAliveTime(), executor.getUnit());
threadPoolExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()));
Block