FutureTask線上程池中應用和原始碼解析
FutureTask 是一個支援取消的非同步處理器,一般線上程池中用於非同步接受callable返回值。
主要實現分三部分:
- 封裝 Callable,然後放到執行緒池中去非同步執行->run。
- 獲取結果-> get。
- 取消任務-> cancel。
接下來主要學習下該模型如何實現。
舉例說明FutureTask線上程池中的應用
// 第一步,定義執行緒池, ExecutorService executor = new ThreadPoolExecutor( minPoolSize, maxPollSize, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<>()); // 第二步,放到執行緒池中執行,返回FutureTask FutureTasktask = executor.submit(callable); // 第三步,獲取返回值 T data = task.get();
學習FutureTask實現
類屬性
//以下是FutureTask的各種狀態 private volatile int state; private static final int NEW= 0; private static final int COMPLETING= 1; private static final int NORMAL= 2; private static final int EXCEPTIONAL= 3; private static final int CANCELLED= 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED= 6; private Callable<V> callable; //執行的任務 private Object outcome; //儲存結果或者異常 private volatile Thread runner;//執行callable的執行緒 private volatile WaitNode waiters; //呼叫get方法等待獲取結果的執行緒棧 其中各種狀態存在 最終狀態 status>COMPLETING 1)NEW -> COMPLETING -> NORMAL(有正常結果) 2) NEW -> COMPLETING -> EXCEPTIONAL(結果為異常) 3) NEW -> CANCELLED(無結果) 4) NEW -> INTERRUPTING -> INTERRUPTED(無結果)
類方法
從上面舉例說明開始分析。
run()方法
FutureTask 繼承 Runnable,ExecutorService submit 把提交的任務封裝成 FutureTask 然後放到執行緒池 ThreadPoolExecutor 的 execute 執行。
public void run() { //如果不是初始狀態或者cas設定執行執行緒是當前執行緒不成功,直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 執行callable任務 這裡對異常進行了catch result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 封裝異常到outcome } if (ran) set(result); } } finally { runner = null; int s = state; // 這裡如果是中斷中,設定成最終狀態 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
以上是 run 方法原始碼實現很簡單,解析如下:
- 如果不是始狀態或者 cas 設定執行執行緒是當前執行緒不成功,直接返回,防止多個執行緒重複執行。
- 執行 Callable 的 call(),即提交執行任務(這裡做了catch,會捕獲執行任務的異常封裝到 outcome 中)。
- 如果成功執行 set 方法,封裝結果。
set 方法
protected void set(V v) { //cas方式設定成completing狀態,防止多個執行緒同時處理 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // 封裝結果 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終設定成normal狀態 finishCompletion(); } }
解析如下:
- cas方式設定成completing狀態,防止多個執行緒同時處理
- 封裝結果到outcome,然後設定到最終狀態normal
- 執行finishCompletion方法。
finishCompletion方法
// state > COMPLETING; 不管異常,中斷,還是執行完成,都需要執行該方法來喚醒呼叫get方法阻塞的執行緒 private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // cas 設定waiters為null,防止多個執行緒執行。 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 迴圈喚醒所有等待結果的執行緒 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //喚醒執行緒 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //該方法為空,可以被重寫 done(); callable = null;// to reduce footprint }
解析如下:
遍歷waiters中的等待節點,並通過 LockSupport 喚醒每一個節點,通知每個執行緒,該任務執行完成(可能是執行完成,也可能 cancel,異常等)。
以上就是執行的過程,接下來分析獲取結果的過程->get。
get 方法
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
解析如下:
以上兩個方法,原理一樣,其中一個設定超時時間,支援最多阻塞多長時間。
狀態如果小於 COMPLETING,說明還沒到最終狀態,(不管是否是成功、異常、取消)。
呼叫 awaitDone 方法阻塞執行緒,最終呼叫 report 方法返回結果。
awaitDone 方法
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //執行緒可中斷,如果當前阻塞獲取結果執行緒執行interrupt()方法,則從佇列中移除該節點,並丟擲中斷異常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果已經是最終狀態,退出返回 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //這裡做了個優化,competiting到最終狀態時間很短,通過yield比掛起響應更快。 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 初始化該阻塞節點 else if (q == null) q = new WaitNode(); // cas方式寫到阻塞waiters棧中 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 這裡做阻塞時間處理。 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 阻塞執行緒,有超時時間 LockSupport.parkNanos(this, nanos); } else // 阻塞執行緒 LockSupport.park(this); } }
解析如下:
整體流程已寫到註解中,整體實現是放在一個死迴圈中,唯一出口,是達到最終狀態。
然後是構建節點元素,並將該節點入棧,同時阻塞當前執行緒等待執行主任務的執行緒喚醒該節點。
report 方法
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
然後是report方法,如果是正常結束,返回結果,如果不是正常結束(取消,中斷)丟擲異常。
最後分析下取消流程。
cancel 方法
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try {// in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
解析如下:
mayInterruptIfRunning引數是是否允許執行中被中斷取消。
- 根據入參是否為true,CAS設定狀態為INTERRUPTING或CANCELLED,設定成功,繼續第二步,否則直接返回false。
- 如果允許執行中被中斷取消,呼叫runner.interupt()進行中斷取消,設定狀態為INTERRUPTED。
- 喚醒所有在get()方法等待的執行緒。
此處有兩種狀態轉換:
- 如果mayInterruptIfRunning為true:status狀態轉換為 new -> INTERRUPTING->INTERRUPTED。主動去中斷執行執行緒,然後喚醒所有等待結果的執行緒。
- 如果mayInterruptIfRunning為false:status狀態轉換為 new -> CANCELLED。
不會去中斷執行執行緒,直接喚醒所有等待結果的執行緒,從 awaitDone 方法中可以看到,喚醒等待執行緒後,直接從跳轉回 get 方法,然後把結果返回給獲取結果的執行緒,當然此時的結果是 null。
總結
以上就是 FutureTask 的原始碼簡單解析,實現比較簡單,FutureTask 就是一個實現 Future 模式,支援取消的非同步處理器。