java 中的 Executors 簡介與多執行緒在網站上逐步優化的運用案例
提供Executor的工廠類

忽略了自定義的ThreadFactory、callable和unconfigurable相關的方法
-
newFixedxxx:在任意時刻,最多有nThreads個執行緒在處理task;如果所有執行緒都在執行時來了新的任務,它會被扔入佇列;如果有執行緒在執行期間因某種原因終止了執行,如果需要執行後續任務,新的執行緒將取代它
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 複製程式碼
-
newCachedxxx:新任務到來如果執行緒池中有空閒的執行緒就複用,否則新建一個執行緒。如果一個執行緒超過60秒沒有使用,它就會被關閉移除執行緒池
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 複製程式碼
-
newSingleThreadExecutor:僅使用一個執行緒來處理任務,如果這執行緒掛了,會產生一個新的執行緒來代替它。每一個任務被保證按照順序執行,而且一次只執行一個
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 複製程式碼
使用newFixedxxx方法也能實現類似的作用,但是ThreadPoolExecutor會提供修改執行緒數的方法,FinalizableDelegatedExecutorService則沒有修改的途徑,它在DelegatedExecutorService的基礎 上僅提供了執行finalize時候去關閉執行緒,而DelegatedExecutorService僅暴漏ExecutorService自身的方法
-
newScheduledThreadPool:提供一個執行緒池來延遲或者定期執行任務
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } 複製程式碼
-
newSingleThreadScheduledExecutor:提供單個執行緒來延遲或者定期執行任務,如果執行的執行緒掛了,會生成新的。
return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); 複製程式碼
同樣,它保證返回的Executor自身的執行緒數不可修改
從上述的實現可以看出,核心在於三個部分
- ThreadPoolExecutor:提供執行緒數相關的控制
- DelegatedExecutorService:僅暴露ExecutorService自身的方法,保證執行緒數不變來實現語義場景
- ScheduledExecutorService:提供延遲或者定期執行的功能
對應的,相應也有不同的佇列去實現不同的場景
- LinkedBlockingQueue:無界阻塞佇列
- SynchronousQueue:沒有消費者消費時,新的任務就會被阻塞
- DelayQueue:佇列中的任務過期之後才可以執行,否則無法查詢到佇列中的元素
DelegatedExecutorService
它僅僅是包裝了ExecutorService的方法,交由傳入的ExecutorService來執行,所謂的UnConfigurable實際也就是它沒有暴漏配置各種引數調整的方法
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } } 複製程式碼
ScheduledExecutorService
提供一系列的schedule方法,使得任務可以延遲或者週期性的執行,對應schedule方法會返回ScheduledFuture以供確認是否執行以及是否要取消。它的實現ScheduledThreadPoolExecutor也支援立即執行由submit提交的任務
僅支援相對延遲時間,比如距離現在5分鐘後執行。類似Timer也可以管理延遲任務和週期任務,但是存在一些缺陷:
ScheduledExecutorService的多執行緒機制可彌補 1:可以使用try-catch-finally對相應執行快處理;2:通過execute執行的方法可以設定UncaughtExceptionHandler來接收未捕獲的異常,並作出處理;3:通過submit執行的,將被封裝層ExecutionException重新丟擲
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 複製程式碼
- corePoolSize、maximumPoolSize:ThreadPoolExecutor會根據這兩自動調整執行緒池的大小,當一個新任務通過execute提交的時候:
如果當前執行的執行緒數小於corePoolSize就新建執行緒;
如果當前執行緒數在corePoolSize與maximumPoolSize之間,則只有在佇列滿的時候才會建立新的執行緒;
如果已經達到最大執行緒數,並且佇列都滿了,在這種飽和狀態下就會執行拒絕策略預設情況下,只有新任務到達的時候才會啟動執行緒,可通過prestartCoreThread方法實現事先啟動
- corePoolSize:預設執行緒池所需要維護的最小的worker的數量,就算是worker過期了也會保留。如果想要不保留,則需要設定allowCoreThreadTimeOut,此時最小的就是0
- maximumPoolSize:執行緒池最大的執行緒數。java限制最多為 2^29-1,大約5億個
- keepAliveTime、unit:如果當前執行緒池有超過corePoolSize的執行緒數,只要有執行緒空閒時間超過keepAliveTime的設定,就會被終止;unit則是它的時間單位
- workQueue:任何BlockingQueue都可以使用,基本上有三種
- Direct handoffs,直接交付任務。比如 SynchronousQueue,如果沒有執行緒消費,提交任務會失敗,當然可以新建一個執行緒來處理。它適合處理有依賴關係的任務,一般它的maximumPoolSizes會被設定成最大的
- Unbounded queues,無界佇列。比如LinkedBlockingQueue,這意味著如果有corePoolSize個執行緒在執行,那麼其他的任務都只能等待。它適合於處理任務都是互相獨立的,
- Bounded queues,有界佇列。比如ArrayBlockingQueue,需要考慮佇列大小和最大執行緒數之間的關係,來達到更好的資源利用率和吞吐量
- threadFactory:沒有指定的時候,使用
Executors.defaultThreadFactory
- RejectedExecutionHandler:通過execute新增的任務,如果Executor已經關閉或者已經飽和了(執行緒數達到了maximumPoolSize,並且佇列滿了),就會執行,java提供了4種策略:
- AbortPolicy,拒絕的時候丟擲執行時異常RejectedExecutionException;
- CallerRunsPolicy,如果executor沒有關閉,那麼由呼叫execute的執行緒來執行它;
- DiscardPolicy,直接扔掉新的任務;
- DiscardOldestPolicy,如果executor沒有關閉,那麼扔掉佇列頭部的任務,再次嘗試;
ThreadPoolExecutor可自定義beforeExecutor、afterExecutor可以用來新增日誌統計、計時、件事或統計資訊收集功能,無論run是正常返回還是丟擲異常,afterExecutor都會被執行。如果beforeExecutor丟擲RuntimeException,任務和afterExecutor都不會被執行。terminated在所有任務都已經完成,並且所有工作者執行緒關閉後會呼叫,此時也可以用來執行傳送通知、記錄日誌等等。
如何估算執行緒池的大小
- 計算密集型,通常在擁有 個處理器的系統上,執行緒池大小設定為 能夠實現最優的利用率;
cpu的個數
- I/O密集型或者其它阻塞型的任務,定義 為CPU的個數, 為CPU的利用率, 為等待時間與計算時間的比率,此時執行緒池的最優大小為
場景說明
將一個網站的業務抽象成如下幾塊
- 接收客戶端請求與處理請求
- 頁面渲染返回的文字和圖片
- 獲取頁面的廣告
接收請求與處理請求
理論模型
理論上,服務端通過實現約定的介面就可以實現接收請求和處理連續不斷的請求過來
ServerSocket socket = new ServerSocket(80); while(true){ Socket conn = socket.accept(); handleRequest(conn) } 複製程式碼
缺點:每次只能處理一個請求,新請求到來時,必須等到正在處理的請求處理完成,才能接收新的請求
顯示的建立多執行緒
為每個請求建立新的執行緒提供服務
ServerSocket socket = new ServerSocket(80); while(true){ final Socket conn = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(conn); } } new Thread(task).start(); } 複製程式碼
缺點:
- 執行緒的建立和銷燬都有一定的開銷,延遲對請求的處理;
- 建立後的執行緒多於可用處理器的數量,造成執行緒閒置,這會給垃圾回收帶來壓力
- 存活的大量執行緒競爭CPU資源會產生很多效能開銷
- 系統上對可建立的執行緒數存在限制
使用執行緒池
使用java自帶的Executor框架。
private static final Executor exec = Executors.newFixedThreadPool(100); ... ServerSocket socket = new ServerSocket(80); while(true){ final Socket conn = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(conn); } } exec.execute(task); } ... 複製程式碼
執行緒池策略通過實現預估好的執行緒需求,限制併發任務的數量,重用現有的執行緒,解決每次建立執行緒的資源耗盡、競爭過於激烈和頻繁建立的問題,也囊括了執行緒的優勢,解耦了任務提交和任務執行。
頁面渲染返回的文字和圖片
序列渲染
renderText(source); List<ImageData> imageData = new ArrayList<ImageData>(); for(ImageInfo info:scaForImageInfo(source)){ imageData.add(info.downloadImage()); } for(ImageData data:imageData){ renderImage(data); } 複製程式碼
缺點:影象的下載大部分時間在等待I/O操作執行完成,這期間CPU幾乎不做任何工作,使得使用者看到最終頁面之前要等待過長的時間
並行化
渲染過程可以分成兩個部分,1是渲染文字,1是下載影象
private static final ExecutorService exec = Executors.newFixedThreadPool(100); ... final List<ImageInfo> infos=scaForImageInfo(source); Callable<List<ImageData>> task=new Callable<List<ImageData>>(){ public List<ImageData> call(){ List<ImageData> r = new ArrayList<ImageData>(); for(ImageInfo info:infos){ r.add(info.downloadImage()); } return r; } }; Future<List<ImageData>> future = exec.submit(task); renderText(source); try{ List<ImageData> imageData = future.get(); for(ImageData data:imageData){ renderImage(data); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); future.cancel(true); }catche(ExecutionException e){ throw launderThrowable(e.getCause()); } 複製程式碼
使用Callable來返回下載的圖片結果,使用future來獲得下載的圖片,這樣將減少使用者所需要的等待時間。
缺點:圖片的下載很明顯時間要比文字要慢,這樣的並行化很可能速度可能只提升了1%
並行效能提升
使用CompletionService。
private static final ExecutorService exec; ... final List<ImageInfo> infos=scaForImageInfo(source); CompletionService<ImageData> cService =new ExecutorCompletionService<ImageData>(exec); for(final ImageInfo info:infos){ cService.submit(new Callable<ImageData>(){ public ImageData call(){ return info.downloadImage(); } }); } renderText(source); try{ for(int i=0,n=info.size();t<n;t++){ Future<ImageData> f = cService.take(); ImageData imageData=f.get(); renderImage(imageData) } }catch(InterruptedException e){ Thread.currentThread().interrupt(); }catche(ExecutionException e){ throw launderThrowable(e.getCause()); } 複製程式碼
核心思路為為每一幅影象下載都建立一個獨立的任務,並在執行緒池中執行他們,從而將序列的下載過程轉換為並行的過程
獲取頁面的廣告
廣告展示如果在一定的時間以內沒有獲取,可以不再展示,並取消超時的任務。
ExecutorService exe = Executors.newFixedThreadPool(3); List<MyTask> myTasks = new ArrayList<>(); for (int i=0;i<3;i++){ myTasks.add(new MyTask(3-i)); } try { List<Future<String>> futures = exe.invokeAll(myTasks, 1, TimeUnit.SECONDS); for (int i=0;i<futures.size();i++){ try { String s = futures.get(i).get(); System.out.println("task execut "+myTasks.get(i).getSleepSeconds()+" s"); } catch (ExecutionException e) { System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute "); }catch (CancellationException e){ System.out.println("task sleep "+myTasks.get(i).getSleepSeconds()+" not execute ,because "+e); } } } catch (InterruptedException e) { e.printStackTrace(); } exe.shutdown(); 複製程式碼
invokeAll方法對於沒有完成的任務會被取消,通過CancellationException可以捕獲,invokeAll返回的序列順序和傳入的task保持一致。結果如下:
task sleep 3 not execute ,because java.util.concurrent.CancellationException task sleep 2 not execute ,because java.util.concurrent.CancellationException task execut 1 s 複製程式碼