執行緒池原理--執行器AbstractExecutorService
阿新 • • 發佈:2018-11-10
文章目錄
執行緒池原理–總索引
執行緒池原理–執行器AbstractExecutorService
AbstractExecutorService
AbstractExecutorService是一個抽象類,實現自ExecutorService介面。該類實現了介面中的一些方法。
Sumit
可以看到,不管提交的是Runnable型別或者Callable型別的任務,最終都是建立FutureTask型別物件,並提交給execute()方法執行,execute()方法由子類實現。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException ();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
批量提交任務
invokeAll 用於批量提交任務,該方法會阻塞,來看一下其實現原理。
如果發生任何異常,所有的任務都會被取消。
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
// 迴圈提交任務給execute()
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
//迴圈檢測所有任務是否執行完畢
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
//在這裡阻塞一直到f的任務執行完畢。
if (!f.isDone()) {
try { f.get(); }
catch (CancellationException ignore) {}
catch (ExecutionException ignore) {}
}
}
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
}
doInvokeAny也是用於批量提交任務,也會發生阻塞,但是隻要有一個任務執行完畢,那麼便會退出阻塞狀態並返回。
並且如果發生任何異常,所有的任務都會被取消。
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
//記錄當前未提交的任務數量
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
//提交第一個任務
futures.add(ecs.submit(it.next()));
//每提交第一個任務,ntasks-1
--ntasks;
int active = 1;
for (;;) {
//檢索並移除下一個要完成的任務
Future<T> f = ecs.poll();
//f == null ,說明還沒有任務要完成,繼續新增任務執行
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
//超時判斷處理
else if (timed) {
f = ecs.poll(nanos, NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
//f!= null ,說明已經有任務完成,返回結果
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
cancelAll(futures);
}
}