1. 程式人生 > >Java並發(六)線程池監控

Java並發(六)線程池監控

可用性 completed 越來越大 keep incr 構造 日誌 開發 task

目錄

  一、線程池監控參數

  二、線程池監控類

  三、註意事項

在上一篇博文中,我們介紹了線程池的基本原理和使用方法。了解了基本概念之後,我們可以使用 Executors 類創建線程池來執行大量的任務,使用線程池的並發特性提高系統的吞吐量。但是,線程池使用不當也會使服務器資源枯竭,導致異常情況的發生,比如固定線程池的阻塞隊列任務數量過多、緩存線程池創建的線程過多導致內存溢出、系統假死等問題。因此,我們需要一種簡單的監控方案來監控線程池的使用情況,比如完成任務數量、未完成任務數量、線程大小等信息。

一、線程池監控參數

上一篇博文提到,線程池提供了以下幾個方法可以監控線程池的使用情況:

方法 含義
getActiveCount() 線程池中正在執行任務的線程數量
getCompletedTaskCount() 線程池已完成的任務數量,該值小於等於taskCount
getCorePoolSize() 線程池的核心線程數量
getLargestPoolSize() 線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過,也就是達到了maximumPoolSize
getMaximumPoolSize() 線程池的最大線程數量
getPoolSize() 線程池當前的線程數量
getTaskCount() 線程池已經執行的和未執行的任務總數

通過這些方法,可以對線程池進行監控,在 ThreadPoolExecutor 類中提供了幾個空方法,如 beforeExecute 方法, afterExecute 方法和 terminated 方法,可以擴展這些方法在執行前或執行後增加一些新的操作,例如統計線程池的執行任務的時間等,可以繼承自 ThreadPoolExecutor 來進行擴展。

二、線程池監控類

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import
java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 繼承ThreadPoolExecutor類,覆蓋了shutdown(), shutdownNow(), beforeExecute() 和 afterExecute() * 方法來統計線程池的執行情況 * <p> * Created by on 2019/4/19. */ public class ThreadPoolMonitor extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class); /** * 保存任務開始執行的時間,當任務結束時,用任務結束時間減去開始時間計算任務執行時間 */ private ConcurrentHashMap<String, Date> startTimes; /** * 線程池名稱,一般以業務名稱命名,方便區分 */ private String poolName; /** * 調用父類的構造方法,並初始化HashMap和線程池名稱 * * @param corePoolSize 線程池核心線程數 * @param maximumPoolSize 線程池最大線程數 * @param keepAliveTime 線程的最大空閑時間 * @param unit 空閑時間的單位 * @param workQueue 保存被提交任務的隊列 * @param poolName 線程池名稱 */ public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), poolName); } /** * 調用父類的構造方法,並初始化HashMap和線程池名稱 * * @param corePoolSize 線程池核心線程數 * @param maximumPoolSize 線程池最大線程數 * @param keepAliveTime 線程的最大空閑時間 * @param unit 空閑時間的單位 * @param workQueue 保存被提交任務的隊列 * @param threadFactory 線程工廠 * @param poolName 線程池名稱 */ public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, String poolName) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.startTimes = new ConcurrentHashMap<>(); this.poolName = poolName; } /** * 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計線程池情況 */ @Override public void shutdown() { // 統計已執行任務、正在執行任務、未執行任務數量 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); super.shutdown(); } /** * 線程池立即關閉時,統計線程池情況 */ @Override public List<Runnable> shutdownNow() { // 統計已執行任務、正在執行任務、未執行任務數量 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); return super.shutdownNow(); } /** * 任務執行之前,記錄任務開始時間 */ @Override protected void beforeExecute(Thread t, Runnable r) { startTimes.put(String.valueOf(r.hashCode()), new Date()); } /** * 任務執行之後,計算任務結束時間 */ @Override protected void afterExecute(Runnable r, Throwable t) { Date startDate = startTimes.remove(String.valueOf(r.hashCode())); Date finishDate = new Date(); long diff = finishDate.getTime() - startDate.getTime(); // 統計任務耗時、初始線程數、核心線程數、正在執行的任務數量、 // 已完成任務數量、任務總數、隊列裏緩存的任務數量、池中存在的最大線程數、 // 最大允許的線程數、線程空閑時間、線程池是否關閉、線程池是否終止 LOGGER.info("{}-pool-monitor: " + "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " + "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " + "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}", this.poolName, diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated()); } /** * 創建固定線程池,代碼源於Executors.newFixedThreadPool方法,這裏增加了poolName * * @param nThreads 線程數量 * @param poolName 線程池名稱 * @return ExecutorService對象 */ public static ExecutorService newFixedThreadPool(int nThreads, String poolName) { return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName); } /** * 創建緩存型線程池,代碼源於Executors.newCachedThreadPool方法,這裏增加了poolName * * @param poolName 線程池名稱 * @return ExecutorService對象 */ public static ExecutorService newCachedThreadPool(String poolName) { return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName); } /** * 生成線程池所用的線程,只是改寫了線程池默認的線程工廠,傳入線程池名稱,便於問題追蹤 */ static class EventThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; /** * 初始化線程工廠 * * @param poolName 線程池名稱 */ EventThreadFactory(String poolName) { SecurityManager s = System.getSecurityManager(); group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } }

ThreadPoolMonitor 類繼承了 ThreadPoolExecutor 類,重寫了shutdown()shutdownNow()beforeExecute()afterExecute()方法來統計線程池的執行情況,這四個方法是 ThreadPoolExecutor 類預留給開發者進行擴展的方法,具體如下:

方法 含義
shutdown() 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計已執行任務、正在執行任務、未執行任務數量
shutdownNow() 線程池立即關閉時,統計已執行任務、正在執行任務、未執行任務數量
beforeExecute(Thread t, Runnable r) 任務執行之前,記錄任務開始時間,startTimes這個HashMap以任務的hashCode為key,開始時間為值
afterExecute(Runnable r, Throwable t) 任務執行之後,計算任務結束時間。統計任務耗時、初始線程數、核心線程數、正在執行的任務數量、已完成任務數量、任務總數、隊列裏緩存的任務數量、池中存在的最大線程數、最大允許的線程數、線程空閑時間、線程池是否關閉、線程池是否終止信息

監控日誌:

22:50:25.376 [cellphone-pool-1-thread-3] INFO org.cellphone.common.pool.ThreadPoolMonitor - cellphone-pool-monitor: Duration: 1009 ms, PoolSize: 3, CorePoolSize: 3, Active: 1, Completed: 17, Task: 18, Queue: 0, LargestPoolSize: 3, MaximumPoolSize: 3,  KeepAliveTime: 0, isShutdown: false, isTerminated: false

一般我們會依賴 beforeExecuteafterExecute 這兩個方法統計的信息,具體原因請參考需要註意部分的最後一項。有了這些信息之後,我們可以根據業務情況和統計的線程池信息合理調整線程池大小,根據任務耗時長短對自身服務和依賴的其他服務進行調優,提高服務的可用性。

三、註意事項

1. 在 afterExecute 方法中需要註意,需要調用 ConcurrentHashMapremove 方法移除並返回任務的開始時間信息,而不是調用 get 方法,因為在高並發情況下,線程池裏要執行的任務很多,如果只獲取值不移除的話,會使 ConcurrentHashMap 越來越大,引發內存泄漏或溢出問題。該行代碼如下:

Date startDate = startTimes.remove(String.valueOf(r.hashCode()));

2. 有了ThreadPoolMonitor類之後,我們可以通過 newFixedThreadPool(int nThreads, String poolName)newCachedThreadPool(String poolName) 方法創建兩個日常我們使用最多的線程池,跟默認的 Executors 裏的方法不同的是,這裏需要傳入 poolName 參數,該參數主要是用來給線程池定義一個與業務相關並有具體意義的線程池名字,方便我們排查線上問題。

3. 在生產環境中,謹慎調用 shutdown()shutdownNow() 方法,因為調用這兩個方法之後,線程池會被關閉,不再接收新的任務,如果有新任務提交到一個被關閉的線程池,會拋出 java.util.concurrent.RejectedExecutionException 異常。其實在使用Spring等框架來管理類的生命周期的條件下,也沒有必要調用這兩個方法來關閉線程池,線程池的生命周期完全由該線程池所屬的Spring管理的類決定。

Java並發(六)線程池監控