1. 程式人生 > >Java高並發之線程池詳解

Java高並發之線程池詳解

大小 eight nds 程序退出 zab ron 策略 ace bubuko

線程池優勢

在業務場景中, 如果一個對象創建銷毀開銷比較大, 那麽此時建議池化對象進行管理.

例如線程, jdbc連接等等, 在高並發場景中, 如果可以復用之前銷毀的對象, 那麽系統效率將大大提升.

另外一個好處是可以設定池化對象的上限, 例如預防創建線程數量過多導致系統崩潰的場景.

jdk中的線程池

技術分享圖片

下文主要從以下幾個角度講解:

  • 創建線程池
  • 提交任務
  • 潛在宕機風險
  • 線程池大小配置
  • 自定義阻塞隊列BlockingQueue
  • 回調接口
  • 自定義拒絕策略
  • 自定義ThreadFactory
  • 關閉線程池

創建線程池

我們可以通過自定義ThreadPoolExecutor或者jdk內置的Executors

來創建一系列的線程池

  • newFixedThreadPool: 創建固定線程數量的線程池
  • newSingleThreadExecutor: 創建單一線程的池
  • newCachedThreadPool: 創建線程數量自動擴容, 自動銷毀的線程池
  • newScheduledThreadPool: 創建支持計劃任務的線程池

上述幾種都是通過new ThreadPoolExecutor()來實現的, 構造函數源碼如下:

 1     /**
 2      * @param corePoolSize 池內核心線程數量, 超出數量的線程會進入阻塞隊列
 3      * @param maximumPoolSize 最大可創建線程數量
4 * @param keepAliveTime 線程存活時間 5 * @param unit 存活時間的單位 6 * @param workQueue 線程溢出後的阻塞隊列 7 */ 8 public ThreadPoolExecutor(int corePoolSize, 9 int maximumPoolSize, 10 long keepAliveTime, 11 TimeUnit unit,
12 BlockingQueue<Runnable> workQueue) { 13 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); 14 } 15 16 public static ExecutorService newFixedThreadPool(int nThreads) { 17 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 18 } 19 20 public static ExecutorService newSingleThreadExecutor() { 21 return new Executors.FinalizableDelegatedExecutorService 22 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); 23 } 24 25 public static ExecutorService newCachedThreadPool() { 26 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 27 } 28 29 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 30 return new ScheduledThreadPoolExecutor(corePoolSize); 31 } 32 33 public ScheduledThreadPoolExecutor(int corePoolSize) { 34 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue()); 35 }

提交任務

直接調用executorService.execute(runnable)或者submit(runnable)即可,

execute和submit的區別在於submit會返回Future來獲取任何執行的結果.

我們看下newScheduledThreadPool的使用示例.

 1 public class SchedulePoolDemo {
 2 
 3     public static void main(String[] args){
 4         ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
 5         // 如果前面的任務沒有完成, 調度也不會啟動
 6         service.scheduleAtFixedRate(new Runnable() {
 7             @Override
 8             public void run() {
 9                 try {
10                     Thread.sleep(2000);
11                     // 每兩秒打印一次.
12                     System.out.println(System.currentTimeMillis()/1000);
13                 } catch (InterruptedException e) {
14                     e.printStackTrace();
15                 }
16             }
17         }, 0, 2, TimeUnit.SECONDS);
18     }
19 }

潛在宕機風險

使用Executors來創建要註意潛在宕機風險.其返回的線程池對象的弊端如下:

  • FixedThreadPool和SingleThreadPoolPool : 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM.
  • CachedThreadPool和ScheduledThreadPool : 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM.

綜上所述, 在可能有大量請求的線程池場景中, 更推薦自定義ThreadPoolExecutor來創建線程池, 具體構造函數配置見下文.

線程池大小配置

一般根據任務類型進行區分, 假設CPU為N核

  • CPU密集型任務需要減少線程數量, 降低線程之間切換造成的開銷, 可配置線程池大小為N + 1.
  • IO密集型任務則可以加大線程數量, 可配置線程池大小為 N * 2.
  • 混合型任務則可以拆分為CPU密集型與IO密集型, 獨立配置.

自定義阻塞隊列BlockingQueue

主要存放等待執行的線程, ThreadPoolExecutor中支持自定義該隊列來實現不同的排隊隊列.

  • ArrayBlockingQueue:先進先出隊列,創建時指定大小, 有界;
  • LinkedBlockingQueue:使用鏈表實現的先進先出隊列,默認大小為Integer.MAX_VALUE;
  • SynchronousQueue:不保存提交的任務, 數據也不會緩存到隊列中, 用於生產者和消費者互等對方, 一起離開.
  • PriorityBlockingQueue: 支持優先級的隊列

回調接口

線程池提供了一些回調方法, 具體使用如下所示.

 1         ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
 2 
 3             @Override
 4             protected void beforeExecute(Thread t, Runnable r) {
 5                 System.out.println("準備執行任務: " + r.toString());
 6             }
 7 
 8             @Override
 9             protected void afterExecute(Runnable r, Throwable t) {
10                 System.out.println("結束任務: " + r.toString());
11             }
12 
13             @Override
14             protected void terminated() {
15                 System.out.println("線程池退出");
16             }
17         };

可以在回調接口中, 對線程池的狀態進行監控, 例如任務執行的最長時間, 平均時間, 最短時間等等, 還有一些其他的屬性如下:

  • taskCount:線程池需要執行的任務數量.
  • completedTaskCount:線程池在運行過程中已完成的任務數量.小於或等於taskCount.
  • largestPoolSize:線程池曾經創建過的最大線程數量.通過這個數據可以知道線程池是否滿過.如等於線程池的最大大小,則表示線程池曾經滿了.
  • getPoolSize:線程池的線程數量.如果線程池不銷毀的話,池裏的線程不會自動銷毀,所以這個大小只增不減.
  • getActiveCount:獲取活動的線程數.

自定義拒絕策略

線程池滿負荷運轉後, 因為時間空間的問題, 可能需要拒絕掉部分任務的執行.

jdk提供了RejectedExecutionHandler接口, 並內置了幾種線程拒絕策略

  • AbortPolicy: 直接拒絕策略, 拋出異常.
  • CallerRunsPolicy: 調用者自己執行任務策略.
  • DiscardOldestPolicy: 舍棄最老的未執行任務策略.

使用方式也很簡單, 直接傳參給ThreadPool

1         ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 
2                 new SynchronousQueue<Runnable>(),
3                 Executors.defaultThreadFactory(),
4                 new RejectedExecutionHandler() {
5                     @Override
6                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
7                         System.out.println("reject task: " + r.toString());
8                     }
9                 });

自定義ThreadFactory

線程工廠用於創建池裏的線程. 例如在工廠中都給線程setDaemon(true), 這樣程序退出的時候, 線程自動退出.

或者統一指定線程優先級, 設置名稱等等.

 1 class NamedThreadFactory implements ThreadFactory {
 2     private static final AtomicInteger threadIndex = new AtomicInteger(0);
 3     private final String baseName;
 4     private final boolean daemon;
 5 
 6     public NamedThreadFactory(String baseName) {
 7         this(baseName, true);
 8     }
 9 
10     public NamedThreadFactory(String baseName, boolean daemon) {
11         this.baseName = baseName;
12         this.daemon = daemon;
13     }
14 
15     public Thread newThread(Runnable runnable) {
16         Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement());
17         thread.setDaemon(this.daemon);
18         return thread;
19     }
20 }

關閉線程池

跟直接new Thread不一樣, 局部變量的線程池, 需要手動關閉, 不然會導致線程泄漏問題.

默認提供兩種方式關閉線程池.

  • shutdown: 等所有任務, 包括阻塞隊列中的執行完, 才會終止, 但是不會接受新任務.
  • shutdownNow: 立即終止線程池, 打斷正在執行的任務, 清空隊列.

Java高並發之線程池詳解