java多執行緒7.使用執行緒池
只有當任務都是同類型並且相互獨立時,執行緒池的效能才能達到最佳。如果將執行時間較長的與執行時間較短的任務混合在一起,那麼除非執行緒池很大,否則將可能造成擁塞,如果提交的任務依賴於其他任務,那麼除非執行緒池無線大,否則將可能造成死鎖。
例如飢餓死鎖:執行緒池中的任務需要無限等待一些必須由池中其他任務才能提供的資源或條件。
ThreadPoolExecutor的通用建構函式:(在呼叫完建構函式之後可以繼續定製ThreadPoolExecutor)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory, RejectedExecutionHandler handler){ //... }
飽和策略:
ThreadPoolExecutor允許提供一個BlockingQueue來儲存等待執行的任務。
當有界佇列被填滿後,飽和策略開始發揮作用。可以通過呼叫setRejectedExecutionHandler來修改。
中止是預設的飽和策略,該策略將丟擲未檢查的RejectedExecutionException,呼叫者可以捕獲這個異常,然後根據需求編寫自己的處理程式碼。
呼叫者執行策略實現了一種調節機制,該策略既不會拋棄任務,也不會丟擲異常,而是將某些任務回退到呼叫者,從而降低新任務的流量。
例如對於WebServer,當執行緒池中的所有執行緒都被佔用,並且工作佇列被填滿後,下一個任務在呼叫execute時在主執行緒中執行。
由於執行任務需要一定的時間,因此主執行緒至少在一段時間內不能提交任何任務,從而使得工作者執行緒有時間來處理完正在執行的任務。
在這期間,主執行緒不會呼叫accept,因此到達的請求將被儲存在TCP層的佇列中而不是在應用程式的佇列中,如果持續過載,那麼TCP層最終發現它的請求佇列被填滿,同樣會開始拋棄請求。
因此當伺服器過載時,這種過載會逐漸向外蔓延開來---從執行緒池到工作佇列到應用程式再到TCP層,最終到達客戶端,導致伺服器在高負載下實現一種平緩的效能降低。
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
當工作佇列被填滿後,沒有預定於的飽和策略來阻塞execute。而通過Semaphore來現在任務的到達率,可以實現。
/** * 設定訊號量的上界設定為執行緒池的大小加上可排隊任務的數量,控制正在執行和等待執行的任務數量。 */ public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec,int bound){ this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable task) throws InterruptedException{ semaphore.acquire(); try{ exec.execute(new Runnable(){ public void run(){ try{ task.run(); }finally{ semaphore.release(); } } }); }catch(RejectedExecutionException e){ semaphore.release(); } } }
執行緒工廠
執行緒池配置資訊中可以定製執行緒工廠,在ThreadFactory中只定義了一個方法newThread,每當執行緒池需要建立一個新執行緒時都會呼叫這個方法。
public interface ThreadFactory{ Thread newThread(Runnable r); }
// 示例:將一個特定於執行緒池的名字傳遞給MyThread的建構函式,從而可以再執行緒轉儲和錯誤日誌資訊中區分來自不同執行緒池的執行緒。 public class MyThreadFactory implements ThreadFactory{ private final String poolName; public MyThreadFactory(String poolName){ this.poolName = poolName; } public Thread newThread(Runnable runnable){ return new MyThread(runnable,poolName); } }
// 示例:為執行緒指定名字,設定自定義UncaughtExceptionHandler向Logger中寫入資訊及維護一些統計資訊以及線上程被建立或者終止時把除錯訊息寫入日誌。 public class MyThread extends Thread{ public static final String default_name = "myThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyThread(Runnable runnable){ this(runnable,default_name); } public MyThread(Runnable runnable, String defaultName) { super(runnable,defaultName + "-" + created.incrementAndGet()); setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE,"uncaught in thread " + t.getName(), e); } }); } public void run(){ boolean debug = debugLifecycle; if(debug){ log.log(Level.FINE,"created " + getName()); } try{ alive.incrementAndGet(); super.run(); }finally{ alive.decrementAndGet(); if(debug){ log.log(Level.FINE,"Exiting " + getName()); } } } }
擴充套件ThreadPoolExecutor
線上程池完成關閉操作時呼叫terminated,也就是在所有任務都已經完成並且所有工作者執行緒也已經關閉後。terminated可以用來釋放Executor在其生命週期裡分配的各種資源,此外還可以執行傳送通知、記錄日誌或者收集finalize統計資訊等操作。
- 示例:給執行緒池新增統計資訊
/** * TimingThreadPool中給出了一個自定義的執行緒池,通過beforeExecute、afterExecute、terminated等方法來新增日誌記錄和統計資訊收集。 * 為了測量任務的執行時間,beforeExecute必須記錄開始時間並把它儲存到一個afterExecute可用訪問的地方。 * 因為這些方法將在執行任務的執行緒中呼叫,因此beforeExecute可以把值儲存到一個ThreadLocal變數中。然後由afterExecute來取。 * 在TimingThreadPool中使用了兩個AtomicLong變數,分別用於記錄已處理的任務和總的處理時間,並通過包含平均任務時間的日誌訊息。 */ public class TimingThreadPool extends ThreadPoolExecutor{ public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } private final ThreadLocal<Long> startTime = new ThreadLocal<Long>(); private final Logger log = Logger.getLogger("TimingThreadPool"); private final AtomicLong numTasks = new AtomicLong(); private final AtomicLong totalTime = new AtomicLong(); protected void beforeExecute(Thread t,Runnable r){ super.beforeExecute(t, r); log.fine(String.format("Thread %s: start %s", t,r)); startTime.set(System.nanoTime()); } protected void afterExecute(Throwable t,Runnable r){ try{ long endTime = System.nanoTime(); long taskTime = endTime - startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); log.fine(String.format("Thread %s: end %s, time=%dns", t,r,taskTime)); }finally{ super.afterExecute(r, t); } } protected void terminated(){ try{ log.info(String.format("Terminated: avg time=%dns", totalTime.get()/numTasks.get())); }finally{ super.terminated(); } } }
#筆記內容來自 《java併發程式設計實戰》