聊聊高併發(四十二)解析java.util.concurrent各個元件(十八) 任務的批量執行和CompletionService
上一篇講了ExecutorService關於任務的非同步執行和狀態控制的部分,這篇說說關於任務批量執行的部分。ExecutorSerivce中關於批量執行的介面如下
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
invokeAll介面返回一個Future集合,包含了所有的任務的Future物件。invokeAny這個介面輸入一組任務,返回任意一個成功執行的任務的結果,其他的任務被取消。
AbstractExecutorService給出了invokeAll和invokeAny的預設實現。通過程式碼可以看到ExecutorService實現的任務批量執行的邏輯和一些問題。
invokeAll的實現如下:
1. 給每個任務建立RunnableTask結構,這個類是FutureTask的實現類,然後扔給執行緒池去執行execute。
2. 輪詢Future集合,如果任務沒有執行完成!f.done(),就呼叫FutureTask.get()方法,這個方法會阻塞等待直到任務完成或者丟擲異常
3. 當所有任務都執行完成後才返回結果
invokeAll的問題顯然易見,就是必須等待所有任務完成才返回,任務執行期間是無法獲得結果的,如果有些任務耗時很長,有些任務耗時很短,那麼先完成的任務也只能全部任務完成後才能返回。
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (Future<T> f : futures) { if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { if (!done) for (Future<T> f : futures) f.cancel(true); } }
正是ExecutorService在處理批量任務時必須等待全部任務都完成才能返回結果的問題,引入了CompletionService介面。CompletionService提供了一個完成佇列來解決這個問題。看一下CompletionService的介面定義。可以看到它的功能分為兩部分
1. submit提交任務,返回Future,進行非同步任務的狀態控制
2. take, poll 這兩個佇列操作,前者是阻塞佇列操作,後者可以快速返回,也可以限時操作
CompletionService的take, poll這兩個方法就是對它的完成佇列進行操作,完成的任務進入完成佇列,可以被直接獲取,不用等待其他任務的完成。
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
ExecutorCompletionService實現了CompletionService介面。它有3個屬性
1. private final Executor executor; 實際執行任務的Executor
2. private final AbstractExecutorService aes; 使用它的newTaskFor方法來適配Runnable和Callable任務,統一返回RunnableFuture結構
3. private final BlockingQueue<Future<V>> completionQueue; 存放執行完成的任務的完成佇列,是一個阻塞佇列
內部類QueueingFuture繼承了FutureTask,它的目的是重寫FutureTask的done方法,將完成的任務自動放入完成佇列completionQueue
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
ExecutorCompletionService預設的建構函式裡使用了LinkedBlockingQueue來作阻塞佇列
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
它的submit方法很簡單,就是把提交的任務封裝成QueueingFuture,然後交給Executor執行,
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
take和poll方法也很簡單,直接交給完成佇列completionQueue來執行阻塞佇列的操作
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
看了CompletionService的實現,來看一下如何使用它。AbstractExecutorService的doInvokeAny方法使用了CompletionService。
invokeAny方法是提交一組任務,然後有一個執行成功的任務就可以返回結果,然後取消其他任務。
1. List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); 建立一個Future集合來存放任務的Future
2. ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); 建立一個ComletionService
3. futures.add(ecs.submit(it.next())); 先提交一個任務
4. 在無限迴圈中,先看一下任務執行結果 Future<T> f = ecs.poll();
5. 如果f != null,表示已經有任務完成,然後呼叫f.get去取結果,如果能取到,就直接返回結果。如果丟擲異常,則繼續迴圈
6. 如果f == null,表示任務沒有完成,就再提交一個任務。如果是限時操作,就計算一下時間,判斷是否超時
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (Future<T> f : futures)
f.cancel(true);
}
}