1. 程式人生 > >獲取Executor提交的併發執行的任務返回結果的兩種方式/ExecutorCompletionService使用

獲取Executor提交的併發執行的任務返回結果的兩種方式/ExecutorCompletionService使用

當我們通過Executor提交一組併發執行的任務,並且希望在每一個任務完成後能立即得到結果,有兩種方式可以採取:

方式一:

通過一個list來儲存一組future,然後在迴圈中輪訓這組future,直到每個future都已完成。如果我們不希望出現因為排在前面的任務阻塞導致後面先完成的任務的結果沒有及時獲取的情況,那麼在呼叫get方式時,需要將超時時間設定為0 

  1. public class CompletionServiceTest {  
  2.     static class Task implements Callable<String>{  
  3.         private int i;  
  4.         public Task(int i){  
  5.             this.i = i;  
  6.         }  
  7.         @Override  
  8.         public String call() throws Exception {  
  9.             Thread.sleep(10000);  
  10.             return Thread.currentThread().getName() + "執行完任務:" + i;  
  11.         }     
  12.     }  
  13.     public static void main(String[] args){  
  14.         testUseFuture();  
  15.     }  
  16.     private static void testUseFuture(){  
  17.         int numThread = 5;  
  18.         ExecutorService executor = Executors.newFixedThreadPool(numThread);  
  19.         List<Future<String>> futureList = new ArrayList<Future<String>>();  
  20.         for(int i = 0;i<numThread;i++ ){  
  21.             Future<String> future = executor.submit(new CompletionServiceTest.Task(i));  
  22.             futureList.add(future);  
  23.         }  
  24.         while(numThread > 0){  
  25.             for(Future<String> future : futureList){  
  26.                 String result = null;  
  27.                 try {  
  28.                     result = future.get(0, TimeUnit.SECONDS);  
  29.                 } catch (InterruptedException e) {  
  30.                     e.printStackTrace();  
  31.                 } catch (ExecutionException e) {  
  32.                     e.printStackTrace();  
  33.                 } catch (TimeoutException e) {  
  34.                     //超時異常直接忽略  
  35.                 }  
  36.                 if(null != result){  
  37.                     futureList.remove(future);  
  38.                     numThread--;  
  39.                     System.out.println(result);  
  40.                     //此處必須break,否則會丟擲併發修改異常。(也可以通過將futureList宣告為CopyOnWriteArrayList型別解決)  
  41.                     break;  
  42.                 }  
  43.             }  
  44.         }  
  45.     }  
  46. }  

 方式二:

第一種方式顯得比較繁瑣,通過使用ExecutorCompletionService,則可以達到程式碼最簡化的效果。

  1. public class CompletionServiceTest {  
  2.     static class Task implements Callable<String>{  
  3.         private int i;  
  4.         public Task(int i){  
  5.             this.i = i;  
  6.         }  
  7.         @Override  
  8.         public String call() throws Exception {  
  9.             Thread.sleep(10000);  
  10.             return Thread.currentThread().getName() + "執行完任務:" + i;  
  11.         }     
  12.     }  
  13.     public static void main(String[] args) throws InterruptedException, ExecutionException{  
  14.         testExecutorCompletionService();  
  15.     }  
  16.     private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{  
  17.         int numThread = 5;  
  18.         ExecutorService executor = Executors.newFixedThreadPool(numThread);  
  19.         CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);  
  20.         for(int i = 0;i<numThread;i++ ){  
  21.             completionService.submit(new CompletionServiceTest.Task(i));  
  22.         }  
  23. }  
  24.         for(int i = 0;i<numThread;i++ ){       
  25.             System.out.println(completionService.take().get());  
  26.         }  
  27.     }  

ExecutorCompletionService分析:

 CompletionService是Executor和BlockingQueue的結合體。

  1. public ExecutorCompletionService(Executor executor) {  
  2.         if (executor == null)  
  3.             throw new NullPointerException();  
  4.         this.executor = executor;  
  5.         this.aes = (executor instanceof AbstractExecutorService) ?  
  6.             (AbstractExecutorService) executor : null;  
  7.         this.completionQueue = new LinkedBlockingQueue<Future<V>>();  
  8.     }  

 任務的提交和執行都是委託給Executor來完成。當提交某個任務時,該任務首先將被包裝為一個QueueingFuture,

  1. public Future<V> submit(Callable<V> task) {  
  2.         if (task == null) throw new NullPointerException();  
  3.         RunnableFuture<V> f = newTaskFor(task);  
  4.         executor.execute(new QueueingFuture(f));  
  5.         return f;  
  6.     }  

 QueueingFuture是FutureTask的一個子類,通過改寫該子類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。

  1. private class QueueingFuture extends FutureTask<Void> {  
  2.         QueueingFuture(RunnableFuture<V> task) {  
  3.             super(task, null);  
  4.             this.task = task;  
  5.         }  
  6.         protected void done() { completionQueue.add(task); }  
  7.         private final Future<V> task;  
  8.     }  

 而通過使用BlockingQueue的take或poll方法,則可以得到結果。在BlockingQueue不存在元素時,這兩個操作會阻塞,一旦有結果加入,則立即返回。

  1. public Future<V> take() throws InterruptedException {  
  2.     return completionQueue.take();  
  3. }  
  4. public Future<V> poll() {  
  5.     return completionQueue.poll();  
  6. 原文:http://xw-z1985.iteye.com/blog/1997077