1. 程式人生 > >一時技癢,擼了個動態執行緒池,原始碼放Github了

一時技癢,擼了個動態執行緒池,原始碼放Github了

## 闡述背景 執行緒池在日常工作中用的還挺多,當需要非同步,批量處理一些任務的時候我們會定義一個執行緒池來處理。 在使用執行緒池的過程中有一些問題,下面簡單介紹下之前遇到的一些問題。 **場景一**:實現一些批量處理資料的功能,剛開始執行緒池的核心執行緒數設的比較小,然後想調整下,只能改完後重啟應用。 **場景二**:有一個任務處理的應用,會接收 MQ 的訊息進行任務的處理,執行緒池的佇列也允許快取一定數量的任務。當任務處理的很慢的時候,想看看到底有多少沒有處理完不是很方便。當時為了快速方便,就直接啟動了一個執行緒去迴圈列印執行緒池佇列的大小。 正好之前在我公眾號有轉發過美團的一篇執行緒池應用的文章([https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w](https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w)),覺得他們的思路非常好,就是沒有開放原始碼,所以自己就抽時間在我的開源專案 Kitty 中增加了一個動態執行緒池的元件,支援了 Cat 監控,動態變更核心引數,任務堆積告警等。今天就給大家分享一下實現的方式。 **專案原始碼地址**:[https://github.com/yinjihuan/kitty](https://github.com/yinjihuan/kitty "https://github.com/yinjihuan/kitty") ## 使用方式 ### 新增依賴 依賴執行緒池的元件,目前 Kitty 未釋出,需要自己下載原始碼 install 本地或者私有倉庫。 ```java ``` ### 新增配置 然後在 Nacos 配置執行緒池的資訊,我的這個整合了 Nacos。推薦一個應用建立一個單獨的執行緒池配置檔案,比如我們這個叫 dataId 為 kitty-cloud-thread-pool.properties,group 為 BIZ_GROUP。 內容如下: ```plain kitty.threadpools.nacosDataId=kitty-cloud-thread-pool.properties kitty.threadpools.nacosGroup=BIZ_GROUP kitty.threadpools.accessToken=ae6eb1e9e6964d686d2f2e8127d0ce5b31097ba23deee6e4f833bc0a77d5b71d kitty.threadpools.secret=SEC6ec6e31d1aa1bdb2f7fd5eb5934504ce09b65f6bdc398d00ba73a9857372de00 kitty.threadpools.owner=尹吉歡 kitty.threadpools.executors[0].threadPoolName=TestThreadPoolExecutor kitty.threadpools.executors[0].corePoolSize=4 kitty.threadpools.executors[0].maximumPoolSize=4 kitty.threadpools.executors[0].queueCapacity=5 kitty.threadpools.executors[0].queueCapacityThreshold=5 kitty.threadpools.executors[1].threadPoolName=TestThreadPoolExecutor2 kitty.threadpools.executors[1].corePoolSize=2 kitty.threadpools.executors[1].maximumPoolSize=4 ``` **nacosDataId,nacosGroup** 監聽配置修改的時候需要知道監聽哪個 DataId,值就是當前配置的 DataId。 **accessToken,secret** 釘釘機器人的驗證資訊,用於告警。 **owner** 這個應用的負責人,告警的訊息中會顯示。 **threadPoolName** 執行緒池的名稱,使用的時候需要關注。 剩下的配置就不一一介紹了,跟執行緒池內部的引數一致,還有一些可以檢視原始碼得知。 ### 注入使用 ```plain @Autowired private DynamicThreadPoolManager dynamicThreadPoolManager; dynamicThreadPoolManager.getThreadPoolExecutor("TestThreadPoolExecutor").execute(() -> { log.info("執行緒池的使用"); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } }, "getArticle"); ``` 通過 DynamicThreadPoolManager 的 getThreadPoolExecutor 方法獲取執行緒池物件,然後傳入 Runnable,Callable 等。第二個引數是這個任務的名稱,之所以要擴充套件一個引數是因為如果任務沒有標識,那麼無法區分任務。 這個執行緒池元件預設集成了 Cat 打點,設定了名稱可以在 Cat 上檢視這個任務相關的監控資料。 ## 擴充套件功能 ### 任務執行情況監控 在 Cat 的 Transaction 報表中會以執行緒池的名稱為型別顯示。 ![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130049725-2134887428.png) 詳情中會以任務的名稱顯示。 ![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130100499-38200194.png) ### 核心引數動態修改 核心引數目前只支援 corePoolSize,maximumPoolSize,queueCapacity(佇列型別為 LinkedBlockingDeque 才可以修改),rejectedExecutionType,keepAliveTime,unit 這些引數的修改。 一般 corePoolSize,maximumPoolSize,queueCapacity 是最常要動態改變的。 需要改動的話直接在 Nacos 中將對應的配置值修改即可,客戶端會監聽配置的修改,然後同步修改先執行緒池的引數。 ### 佇列容量告警 queueCapacityThreshold 是佇列容量告警的閥值,如果佇列中的任務數量超過了 queueCapacityThreshold 就會告警。 ![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130114750-1573450228.png) ### 拒絕次數告警 當佇列容量滿了後,新進來的任務會根據使用者設定的拒絕策略去選擇對應的處理方式。如果是採用 AbortPolicy 策略,也會進行告警。相當於消費者已經超負荷了。 ![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130124578-741847936.png) ### 執行緒池執行情況 底層對接了 Cat,所以將執行緒的執行資料上報給了 Cat。我們可以在 Cat 中檢視這些資訊。 ![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130133702-1941792723.png) 如果你想在自己的平臺去展示,我這邊暴露了/actuator/thread-pool 端點,你可以自行拉取資料。 ```plain { threadPools: [{ threadPoolName: "TestThreadPoolExecutor", activeCount: 0, keepAliveTime: 0, largestPoolSize: 4, fair: false, queueCapacity: 5, queueCapacityThreshold: 2, rejectCount: 0, waitTaskCount: 0, taskCount: 5, unit: "MILLISECONDS", rejectedExecutionType: "AbortPolicy", corePoolSize: 4, queueType: "LinkedBlockingQueue", completedTaskCount: 5, maximumPoolSize: 4 }, { threadPoolName: "TestThreadPoolExecutor2", activeCount: 0, keepAliveTime: 0, largestPoolSize: 0, fair: false, queueCapacity: 2147483647, queueCapacityThreshold: 2147483647, rejectCount: 0, waitTaskCount: 0, taskCount: 0, unit: "MILLISECONDS", rejectedExecutionType: "AbortPolicy", corePoolSize: 2, queueType: "LinkedBlockingQueue", completedTaskCount: 0, maximumPoolSize: 4 }] } ``` ### 自定義拒絕策略 平時我們使用程式碼建立執行緒池可以自定義拒絕策略,在構造執行緒池物件的時候傳入即可。這裡由於建立執行緒池都被封裝好了,我們只能在 Nacos 配置拒絕策略的名稱來使用對應的策略。預設是可以配置 JDK 自帶的 CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy 這四種。 如果你想自定義的話也是支援的,定義方式跟以前一樣,如下: ```plain @Slf4j public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.info("進來了。。。。。。。。。"); } } ``` 要讓這個策略生效的話使用的是 SPI 的方式,需要在 resources 下面建立一個 META-INF 的資料夾,然後建立一個 services 的資料夾,再建立一個 java.util.concurrent.RejectedExecutionHandler 的檔案,內容為你定義的類全路徑。 ![](https://img2020.cnblogs.com/blog/1618095/202006/1618095-20200617130145126-1021996058.png) ### 自定義告警方式 預設是內部集成了釘釘機器人的告警方式,如果你不想用也可以將其關閉。或者將告警資訊對接到你的監控平臺去。 如果沒有告警平臺也可以在專案中實現新的告警方式,比如簡訊等。 只需要實現 ThreadPoolAlarmNotify 這個類即可。 ```plain /** * 自定義簡訊告警通知 * * @作者 尹吉歡 * @個人微信 jihuan900 * @微信公眾號 猿天地 * @GitHub https://github.com/yinjihuan * @作者介紹 http://cxytiandi.com/about * @時間 2020-05-27 22:26 */ @Slf4j @Component public class ThreadPoolSmsAlarmNotify implements ThreadPoolAlarmNotify { @Override public void alarmNotify(AlarmMessage alarmMessage) { log.info(alarmMessage.toString()); } } ``` ## 程式碼實現 具體的就不講的很細了,原始碼在[https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool](https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool "https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool"),大家自己去看,並不複雜。 ### 建立執行緒池 根據配置建立執行緒池,ThreadPoolExecutor 是自定義的,因為需要做 Cat 埋點。 ```plain /** * 建立執行緒池 * @param threadPoolProperties */ private void createThreadPoolExecutor(DynamicThreadPoolProperties threadPoolProperties) { threadPoolProperties.getExecutors().forEach(executor -> { KittyThreadPoolExecutor threadPoolExecutor = new KittyThreadPoolExecutor( executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor.getKeepAliveTime(), executor.getUnit(), getBlockingQueue(executor.getQueueType(), executor.getQueueCapacity(), executor.isFair()), new KittyThreadFactory(executor.getThreadPoolName()), getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()), executor.getThreadPoolName()); threadPoolExecutorMap.put(executor.getThreadPoolName(), threadPoolExecutor); }); } ``` ### 重新整理執行緒池 首先需要監聽 Nacos 的修改。 ```plain /** * 監聽配置修改,spring-cloud-alibaba 2.1.0版本不支援@NacosConfigListener的監聽 */ public void initConfigUpdateListener(DynamicThreadPoolProperties dynamicThreadPoolProperties) { ConfigService configService = nacosConfigProperties.configServiceInstance(); try { configService.addListener(dynamicThreadPoolProperties.getNacosDataId(), dynamicThreadPoolProperties.getNacosGroup(), new AbstractListener() { @Override public void receiveConfigInfo(String configInfo) { new Thread(() -> refreshThreadPoolExecutor()).start(); log.info("執行緒池配置有變化,重新整理完成"); } }); } catch (NacosException e) { log.error("Nacos配置監聽異常", e); } } ``` 然後再重新整理執行緒池的引數資訊,由於監聽事件觸發的時候,這個時候配置其實還沒重新整理,所以我就等待了 1 秒鐘,讓配置完成重新整理然後直接從配置類取值。 雖然有點挫還是可以用,其實更好的方式是解析 receiveConfigInfo 那個 configInfo,configInfo 就是改變之後的整個配置內容。因為不太好解析成屬性檔案,就沒做,後面再改吧。 ```plain /** * 重新整理執行緒池 */ private void refreshThreadPoolExecutor() { try { // 等待配置重新整理完成 Thread.sleep(1000); } catch (InterruptedException e) { } dynamicThreadPoolProperties.getExecutors().forEach(executor -> { ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(executor.getThreadPoolName()); threadPoolExecutor.setCorePoolSize(executor.getCorePoolSize()); threadPoolExecutor.setMaximumPoolSize(executor.getMaximumPoolSize()); threadPoolExecutor.setKeepAliveTime(executor.getKeepAliveTime(), executor.getUnit()); threadPoolExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName())); Block