執行緒學習二:執行緒池執行Runnable與Callable
1、瞭解執行緒池幾個類之間的關係 (結合圖看後面的分析)
FutureTask 繼承 RunnableFuture , RunnableFuture 實現介面 Runnable
2、分析常用呼叫執行緒池程式碼(以下稱為程式碼A)
ExecutorService es = Executors.newFixedThreadPool(15); es.submit(new Callable<String>(){ @Override public String call() throws Exception { return null; }}); es.submit(new myThread());
1)首先我們看看Executors.newFixedThreadPool()這個方法的原始碼
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
可以看到,它實際返回的是ThreadPoolExecutor物件es
2)再看看es.submit()這個方法是如何實現的
在原始碼中,我們會發現ThreadPoolExecutor並沒有方法submit,它是呼叫了父類abstractExecutorService.submit()方法,原始碼如下
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
可以看到,abstractExecutorService是把Callable<T> 物件和 Runnable物件轉換為了RunnbaleFuture<T>的實現類 FutureTask<T>,然後呼叫execute()方法。
而ThreadPoolExecutor類中是重寫了execute()方法的,所以程式碼A中的es.submit實際上執行的是ThreadPoolExecutor.executor().
3)瞭解Callable 與 Runnable轉換為FutureTask<T> (這裡不截原始碼了) 與 FutureTask的run方法。
FutureTask<T>中有個Callable<T>屬性,如果是Callable轉,那麼直接賦值就好了,如果是Runnable轉,那麼Eexcutors中有個類RunnableAdapter,將Runnable物件賦值給一個RunnableAdapter物件ra,然後把ra賦值給FutureTask中的Callable屬性就好了。
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
FutureTask<T>通過父類RunnableFuture也是實現了Runnable介面的 , 所以執行也是start方法 。 run原始碼
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
結果還是把Callable的call()方法丟到了Runnable的run()方法中 ,看到裡面的set(result)沒有,就是通過一個全域性變數,變相的返回執行緒執行結果,獲取是FutureTask.get()方法。
4)再看看ThreadPoolExecutor.executor()是如何實現的 ,具體後續分析
<span style="color:#000000;"> public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}</span>
上面的addWorker(COMMAND,FLAG)方法中,滿足條件會構建new Thread(FutureTask) t,呼叫t.start();
邏輯圖
5)終於到最後了,可以說我最想說的話了 。
之前沒看程式碼,狗日的以為Callable是個很神奇的東西,就是Thread的兄弟,使用起來也挺神祕,只見過再ExecutorService.submit()見到過 。
現在才知道,Callable僅僅只是為執行緒執行的時候能夠返回物件,且僅僅只能在ExecutorService.submit()中使用,其他無卵用。
繞來繞去還是Thread與Runnable的start方法與run方法啊。