1. 程式人生 > >讀書筆記(java併發程式設計實戰——CompletionService)

讀書筆記(java併發程式設計實戰——CompletionService)

原文請參考微信公眾號(歡迎關注公眾號:coding_song):https://mp.weixin.qq.com/s/R50Eh4kTDtA031i-yMUZAw 

 

Callable&Future

Callbale描述的是抽象的計算任務,有明確的起點,並且最終會結束;

 

@FunctionalInterface

public interface Callable<V>{

    V call()throwsException;

}

 

 

Future表示一個任務的生命週期 ,並提供了相應的方法來判斷是否已經完成或取消,以及獲取任務的結果和取消任務等。Future的get方法取決於任務的狀態(尚未開始、正在執行、已完成),如果任務已經完成,get會立即返回結果或丟擲一個異常;如果任務沒有完成,則get將阻塞直到任務完成返回結果;如果任務被取消,則get丟擲CancellationException;

Future的ge(long var1, TimeUnit var3)方法,可以設定超時時間,超時後可以做一些預設的處理,比如頁面上展示廣告資訊,當獲取某個廣告時獲取超時了,超時異常處理時可以設定一個預設廣告位,而不至於什麼都不顯示

public interface Future<V>{

    boolean cancel(boolean var1);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException,ExecutionException;

    V get(long var1,TimeUnit var3) throws InterruptedException,ExecutionException,TimeoutException;

}

CompletionService

CompletionService將Executor和BlockingQueue的功能融合在一起,將Callable任務提交給CompletionService來執行,然後使用類似於佇列操作的take和poll等方法來獲得已完成的結果,而這些結果會在完成是被封裝為Future

 

public interface CompletionService<V>{

    Future<V> submit(Callable<V> var1);

    Future<V> submit(Runnable var1, V var2);

    Future<V> take() throws InterruptedException;

    Future<V> poll();

    Future<V> poll(long var1,TimeUnit var3) throws InterruptedException;

}
 

 

ExcutorCompletionService實現了CompletionService,在建構函式中建立一個BlockingQueue來儲存計算完成的結果,當計算完成時,呼叫FutureTask的done方法,將完成的結果新增到BlockingQueue中;佇列的take和poll方法在得出結果之前是阻塞的

public class ExecutorCompletionService<V> implements CompletionService<V>{

    private final Executor executor;

    private final AbstractExecutorService aes;

    private final BlockingQueue<Future<V>> completionQueue;


    /**

     * FutureTask extension to enqueue upon completion

     */

    private class QueueingFuture extends FutureTask<Void>{

        QueueingFuture(RunnableFuture<V> task){

            super(task,null);

            this.task = task;

        }

        protected void done(){ completionQueue.add(task);}

        private final Future<V> task;

    }

    public Future<V> take()throws InterruptedException{

        return completionQueue.take();

    }

    public Future<V> poll(){

        return completionQueue.poll();

    }

    public Future<V> poll(long timeout,TimeUnit unit)

            throws InterruptedException{

        return completionQueue.poll(timeout, unit);

    }

    // 省略其他方法

}

 

CompletionService的使用:建立n個任務,將其提交到一個執行緒池,保留n個Future,可使用限時的get方法通過Future序列地獲取每個結果;

 

public class CompletionServiceTest{


    private final ExecutorService executorService;


    private final static Integer COUNT =10;


    public CompletionServiceTest(ExecutorService executorService){

        this.executorService = executorService;

    }


    public void test()throws InterruptedException,ExecutionException{

        CompletionService<Object> completionService = new ExecutorCompletionService<>(executorService);

        for(int i =0; i < COUNT; i++){

            int finalI = i;

            completionService.submit(newCallable(){

                @Override

                public Object call()throws Exception{

                    return"done"+ finalI;

                }

            });

        }

        for(int i =0; i < COUNT; i++){

            Future<Object> future = completionService.take();

            Object object = future.get();

        }
 

    }

}

 

上述列子描述多個ExecutorCompletionService共享一個executorService,CompletionService的作用就相當於一組計算的控制代碼,與Future作為單個計算的控制代碼是非常類似的。通過記錄提交給CompletionService的任務數量,並計算出已經獲取的已完成結果的數量,及時使用一個共享的ExecutorService,也能知道已經獲得了所有任務結果的時間

CompletionService應用場景:

(1)動態載入資料、下載圖片等,一旦佇列中有了資料,就可以陸續返回載入到的資料,不需要等到所有資料都載入完成才返回;滾動網頁顯示載入圖片可以用其實現。

(2)從不同資料來源載入資料,一個ExecutorCompletionService從一個數據源中獲取資料,然後通過各個ExecutorCompletionService返回的結果,再做資料整合

(3)多執行緒並行處理資料,可以大大提高程式處理時間,提高效能