Java線程池之ThreadPoolExecutor
前言
線程池可以提高程序的並發性能(當然是合適的情況下),因為對於沒有線程的情況下,我們每一次提交任務都新建一個線程,這種方法存在不少缺陷:
1. 線程的創建和銷毀的開銷非常高,線程的創建需要時間,會延遲任務的執行,會消耗大量的系統資源。
2. 活躍的線程會消耗系統資源,而大量的空閑線程會占用許多內存,給垃圾回收器帶來很大的壓力,而大量線程在競爭CPU資源的時間還會產生氣體的性能開銷。
3. 系統在可創建的線程上存在一個限制,如果超過了這個限制,很可能拋出OOM。
我們不難發現,在一定範圍下,增加線程能夠提高系統的吞吐量,而當線程數超過合理值後,增加的線程反而會降低程序的執行速度。
Executor的引入
在JDK1.5中,我們引入了一個線程池框架,Executor框架,它能夠分解任務的創建和執行過程。它包括Executor、ExecutorService、Callable等接口和Executors、ThreadPoolExecutor實現類等。
註意點
當然了看待事物需要辯證,是否使用了Executor框架就能很好地將復雜的任務執行解耦開來的。這邊我們其實需要限制一下它的使用範圍。
- 對於依賴型的任務而言,不是很適合使用線程池去操作,容易引發死鎖,因為這種情況下我們需要小心維持這些任務的執行順序,以保證不會觸發死鎖。
- 對於通過線程封閉實現線程安全的任務而言,使用單線程的Executor能夠保證更安全的並發。
- 使用ThreadLocal的任務,因為線程池會復用線程,這將導致任務的ThreadLocal值失去意義(除非線程本地值受限於任務的生命周期)。
- 對響應時間敏感的任務,假設我們將一個執行時間很長的任務,或者多個執行時間很長的任務放到一個單線程的Executor中或者一個包含少量線程的線程池中,都會降低程序的響應速度。
合適的線程數
線程池在對處理同一類型的任務且相互獨立的時候,能達到性能上的最佳,否則任務時長不一致很容易引起擁塞或是饑餓。
那線程池的線程數以多少為合適呢,對於計算密集型的任務而言,我們最好設置的線程數 = CPU數+1;(+1是為了保證當某個線程因為缺頁故障或其他原因而暫停時,這個+1的線程能夠確保CPU的時鐘周期不會被浪費);而對於包含IO操作或者其他阻塞操作的任務時,由於線程不會一直執行,所以線程池的規模應該更大點。
線程池的使用
在實際使用過程中,我們一般借助於ThreadPoolExecutor來完成線程池的創建。ThreadPoolExecutor具有極好的擴展性,除了系統提供的四種常用的線程池,如CachedThreadPoolExecutor,FixedThreadPoolExecutor,SingleThreadPoolExecutor,ScheduledThreadExecutor。我們可以自定義線程池的構造函數,如線程池的基本線程數、最大線程數、線程池的超時時間(時間+時間單位),線程池的任務隊列,線程池的線程工廠,線程池的飽和策略。
當然,我們在使用系統提供的四種線程池的時候,同樣可以在後來修改線程池的配置。
線程的任務隊列
LinkedBlockQueue:無界 CachedThreadPoolExecutor FixedThreadPoolExecutor的默認任務隊列
ArrayBlockQueue:有界
PriorityQueue:優先級隊列,有界 ScheduledThreadPoolExecutor的默認任務隊列
SynchronousQueue:同步移交,隊列容量為0,僅當有線程準備好時才會將任務放到隊列中。
飽和策略
飽和策略的設置
對於有界隊列而言,當有界隊列被填滿後,這時候我們需要用到線程的飽和策略,前面提到我們可以在後面配置線程池的設置,而飽和策略的修改就是通過ThreadPoolExecutor的setRejectedExecutionHandler方法來進行修改的(當某個任務唄提交到一個已經被關閉的Executor中,也會用到飽和策略)
飽和策略主要有以下幾種:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。
- AbortPolicy:中止策略,該策略將拋出未受檢的RejectedExecution,調用者可以捕獲這個異常根據實際需要編寫直接的處理代碼。
- CallerRunsPolicy:調用者運行,該策略不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者中,從而降低新任務的流量。它不會在線程池的某個線程中執行新提交的任務,而是調用了execute的線程中執行任務,當線程池的任務隊列被填滿後,下一個任務會在調用execute時在主線程中執行,由於執行任務需要一定時間,這段時間內主線程顯然不能提交任務,從而保證線程池在這段時間內處理現有任務。
- DiscardPolicy:拋棄策略,舍棄任務。
- DiscardOldestPolicy:拋棄下一個將被執行的任務,然後嘗試重新提交新的任務,(不適用於和優先隊列合用,因為這樣將拋棄的將是優先級最高的等待任務)
當然,我們可以使用Semaphore(信號量)來控制任務的提交速率。
線程工廠
我們可以通過自定義線程工廠,來實現我們自己的線程。具體示例如下所示:
自定義的線程工廠,記錄了線程池的名字。
import java.util.concurrent.ThreadFactory; /** * Created by DB on 2017/9/1. */ public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { this.poolName = poolName; } @Override public Thread newThread(Runnable r) { return new MyAPPThread(r,poolName); } }
自定義的線程類:實現了指定線程的名字,設置自定義UncaughtExecptionHandler。
import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; /** * Created by DB on 2017/9/1. */ public class MyAPPThread extends Thread { public static final String DEFALUT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAPPThread(Runnable r){ this(r,DEFALUT_NAME); } public MyAPPThread(Runnable r,String name){ super(r,name +"-"+created.incrementAndGet()); setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE,"Uncaught in thread"+t.getName(),e); } }); } public void run(){ boolean debug = debugLifecycle; if(debug){ log.log(Level.FINE,"Created" +getName()); } try { alive.incrementAndGet(); super.run(); }finally { alive.decrementAndGet(); if(debug){ log.log(Level.FINE,"Exiting"+getName()); } } } public static int getThreadsCreated(){ return created.get(); } public static int getThreadsAlive(){ return alive.get(); } public static boolean getDebug(){ return debugLifecycle; } public static void setDebug(Boolean b){ debugLifecycle=b; } }
擴展ThreadPoolExecutor
ThreadPoolExecutor是可擴展的,它提供了幾個在子類中可以改寫的方法:beforeExecute、afterExecute和terminated,通過實現這些方法我們可以實現擴展。
在執行任務的線程中將調用beforeExecute和afterExecute方法,在這些方法中我們可以添加日誌、計時。監視或者統計信息收集的功能。無論任務是從run中正常返回還是拋出一個異常而返回,afterExecute都會被調用。而beforeExecute拋出RuntimeException時,任務將不被執行,afterExecute當然也不會被調用。
在線程池完成關閉操作時,會調用terminated,也就在所有任務都已經完成並且所有工作者線程也已經關閉後。在這個方法中我們可以實現Executor在其生命周期中分配的各種資源,還有執行發送通知、記錄日誌或者收集finalize統計信息等操作。
具體的框架如下:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by DB on 2017/9/1. */ public class MyThreadPoolExecutor extends ThreadPoolExecutor { public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); } @Override protected void terminated() { super.terminated(); } }
Java線程池之ThreadPoolExecutor