讀書筆記(java併發程式設計實戰——CompletionService)
原文請參考微信公眾號(歡迎關注公眾號:coding_song):https://mp.weixin.qq.com/s/R50Eh4kTDtA031i-yMUZAw
Callable&Future
Callbale描述的是抽象的計算任務,有明確的起點,並且最終會結束;
@FunctionalInterface public interface Callable<V>{ V call()throwsException; }
Future表示一個任務的生命週期 ,並提供了相應的方法來判斷是否已經完成或取消,以及獲取任務的結果和取消任務等。Future的get方法取決於任務的狀態(尚未開始、正在執行、已完成),如果任務已經完成,get會立即返回結果或丟擲一個異常;如果任務沒有完成,則get將阻塞直到任務完成返回結果;如果任務被取消,則get丟擲CancellationException;
Future的ge(long var1, TimeUnit var3)方法,可以設定超時時間,超時後可以做一些預設的處理,比如頁面上展示廣告資訊,當獲取某個廣告時獲取超時了,超時異常處理時可以設定一個預設廣告位,而不至於什麼都不顯示
public interface Future<V>{ boolean cancel(boolean var1); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException,ExecutionException; V get(long var1,TimeUnit var3) throws InterruptedException,ExecutionException,TimeoutException; }
CompletionService
CompletionService將Executor和BlockingQueue的功能融合在一起,將Callable任務提交給CompletionService來執行,然後使用類似於佇列操作的take和poll等方法來獲得已完成的結果,而這些結果會在完成是被封裝為Future
public interface CompletionService<V>{ Future<V> submit(Callable<V> var1); Future<V> submit(Runnable var1, V var2); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long var1,TimeUnit var3) throws InterruptedException; }
ExcutorCompletionService實現了CompletionService,在建構函式中建立一個BlockingQueue來儲存計算完成的結果,當計算完成時,呼叫FutureTask的done方法,將完成的結果新增到BlockingQueue中;佇列的take和poll方法在得出結果之前是阻塞的
public class ExecutorCompletionService<V> implements CompletionService<V>{ private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; /** * FutureTask extension to enqueue upon completion */ private class QueueingFuture extends FutureTask<Void>{ QueueingFuture(RunnableFuture<V> task){ super(task,null); this.task = task; } protected void done(){ completionQueue.add(task);} private final Future<V> task; } public Future<V> take()throws InterruptedException{ return completionQueue.take(); } public Future<V> poll(){ return completionQueue.poll(); } public Future<V> poll(long timeout,TimeUnit unit) throws InterruptedException{ return completionQueue.poll(timeout, unit); } // 省略其他方法 }
CompletionService的使用:建立n個任務,將其提交到一個執行緒池,保留n個Future,可使用限時的get方法通過Future序列地獲取每個結果;
public class CompletionServiceTest{ private final ExecutorService executorService; private final static Integer COUNT =10; public CompletionServiceTest(ExecutorService executorService){ this.executorService = executorService; } public void test()throws InterruptedException,ExecutionException{ CompletionService<Object> completionService = new ExecutorCompletionService<>(executorService); for(int i =0; i < COUNT; i++){ int finalI = i; completionService.submit(newCallable(){ @Override public Object call()throws Exception{ return"done"+ finalI; } }); } for(int i =0; i < COUNT; i++){ Future<Object> future = completionService.take(); Object object = future.get(); } } }
上述列子描述多個ExecutorCompletionService共享一個executorService,CompletionService的作用就相當於一組計算的控制代碼,與Future作為單個計算的控制代碼是非常類似的。通過記錄提交給CompletionService的任務數量,並計算出已經獲取的已完成結果的數量,及時使用一個共享的ExecutorService,也能知道已經獲得了所有任務結果的時間
CompletionService應用場景:
(1)動態載入資料、下載圖片等,一旦佇列中有了資料,就可以陸續返回載入到的資料,不需要等到所有資料都載入完成才返回;滾動網頁顯示載入圖片可以用其實現。
(2)從不同資料來源載入資料,一個ExecutorCompletionService從一個數據源中獲取資料,然後通過各個ExecutorCompletionService返回的結果,再做資料整合
(3)多執行緒並行處理資料,可以大大提高程式處理時間,提高效能