1. 程式人生 > >java.util.concurrent.FutureTask 原始碼

java.util.concurrent.FutureTask 原始碼

執行緒池相關

原始碼:

package java.util.concurrent;

import java.util.concurrent.locks.LockSupport;

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;
    private static final int EXCEPTIONAL = 3;
    private static final int CANCELLED = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED = 6;


    private Callable<V> callable;

    private Object outcome;

    private volatile Thread runner;

    private volatile WaitNode waiters;


    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;

        WaitNode() {
            thread = Thread.currentThread();
        }
    }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;
    }


    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally {
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }


    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    public void run() {
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {

            runner = null;

            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    }


    protected void done() {
    }


    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            finishCompletion();
        }
    }


    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
            finishCompletion();
        }
    }


    protected boolean runAndReset() {
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call();
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {

            runner = null;

            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield();
    }


    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null; ) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (; ; ) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null;
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }


    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (; ; ) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            } else if (s == COMPLETING)
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            } else
                LockSupport.park(this);
        }
    }


    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (; ; ) {
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null)
                            continue retry;
                    } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                        continue retry;
                }
                break;
            }
        }
    }

    // Unsafe方法
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}

類 FutureTask<V>

    型別引數:

    V - 此 FutureTask 的 get 方法所返回的結果型別。

    所有已實現的介面:

    RunnableFuture<V>, RunnableFuture<V>

    可取消的非同步計算。利用開始和取消計算的方法、查詢計算是否完成的方法和獲取計算結果的方法,此類提供了對 

Future 的基本實現。

    僅在計算完成時才能獲取結果;如果計算尚未完成,則阻塞 get 方法。一旦計算完成,就不能再重新開始或取消計算。

    可使用 FutureTask 包裝 Callable 或 Runnable 物件。因為 FutureTask 實現了 Runnable,所以可將 FutureTask 提交給 Executor 執行。

    除了作為一個獨立的類外,此類還提供了 protected 功能,這在建立自定義任務類時可能很有用。

 

構造方法摘要

 

FutureTask(Callable<V> callable) 
          建立一個 FutureTask,一旦執行就執行給定的 Callable。
FutureTask(Runnable runnable, V result) 
          建立一個 FutureTask,一旦執行就執行給定的 Runnable,並安排成功完成時 get 返回給定的結果 。

 方法摘要

 boolean cancel(boolean mayInterruptIfRunning) 
          試圖取消對此任務的執行。
protected  void done() 
          當此任務轉換到狀態 isDone(不管是正常地還是通過取消)時,呼叫受保護的方法。
 V get() 
          如有必要,等待計算完成,然後獲取其結果。
 V get(long timeout, TimeUnit unit) 
          如有必要,最多等待為使計算完成所給定的時間之後,獲取其結果(如果結果可用)。
 boolean isCancelled() 
          如果在任務正常完成前將其取消,則返回 true。
 boolean isDone() 
          如果任務已完成,則返回 true。
 void run() 
          除非已將此 Future 取消,否則將其設定為其計算的結果。
protected  boolean runAndReset() 
          執行計算而不設定其結果,然後將此 Future 重置為初始狀態,如果計算遇到異常或已取消,則該操作失敗。
protected  void set(V v) 
          除非已經設定了此 Future 或已將其取消,否則將其結果設定為給定的值。
protected  void setException(Throwable t) 
          除非已經設定了此 Future 或已將其取消,否則它將報告一個 ExecutionException,並將給定的 throwable 作為其原因。

 從類 java.lang.Object 繼承的方法

cloneequalsfinalizegetClasshashCodenotifynotifyAlltoStringwaitwaitwait

 

FutureTask

public FutureTask(Callable<V> callable)

    建立一個 FutureTask,一旦執行就執行給定的 Callable。

    引數:

    callable - 可呼叫的任務。

    丟擲:

    NullPointerException - 如果 callable 為 null。

 

FutureTask

public FutureTask(Runnable runnable,
                  V result)

    建立一個 FutureTask,一旦執行就執行給定的 Runnable,並安排成功完成時 get 返回給定的結果 。

    引數:

    runnable - 可執行的任務。

    result - 成功完成時要返回的結果。如果不需要特定的結果,則考慮使用下列形式的構造: Future<?> f = new FutureTask<Object>(runnable, null)

    丟擲:

    NullPointerException - 如果 runnable 為 null。

 

 

isCancelled

public boolean isCancelled()

    從介面 Future 複製的描述

        如果在任務正常完成前將其取消,則返回 true。

    指定者:

        介面 Future<V> 中的 isCancelled

    返回:

        如果任務完成前將其取消,則返回 true

 

 

isDone

public boolean isDone()

    從介面 Future 複製的描述

        如果任務已完成,則返回 true。 可能由於正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回 true。

    指定者:

        介面 Future<V> 中的 isDone

    返回:

        如果任務已完成,則返回 true

 

 

cancel

public boolean cancel(boolean mayInterruptIfRunning)

    從介面 Future 複製的描述

        試圖取消對此任務的執行。如果任務已完成、或已取消,或者由於某些其他原因而無法取消,則此嘗試將失敗。當呼叫 cancel 時,如果呼叫成功,而此任務尚未啟動,則此任務將永不執行。如果任務已經啟動,則 mayInterruptIfRunning 引數確定是否應該以試圖停止任務的方式來中斷執行此任務的執行緒。

    此方法返回後,對 Future.isDone() 的後續呼叫將始終返回 true。如果此方法返回 true,則對 Future.isCancelled() 的後續呼叫將始終返回 true。

    指定者:

        介面 Future<V> 中的 cancel

    引數:

    mayInterruptIfRunning - 如果應該中斷執行此任務的執行緒,則為 true;否則允許正在執行的任務執行完成

    返回:

        如果無法取消任務,則返回 false,這通常是由於它已經正常完成;否則返回 true

 

get

public V get()
      throws InterruptedException,
             ExecutionException

    從介面 Future 複製的描述

        如有必要,等待計算完成,然後獲取其結果。

    指定者:

        介面 Future<V> 中的 get

    返回:

        計算的結果

    丟擲:

    CancellationException - 如果計算被取消

    InterruptedException - 如果當前的執行緒在等待時被中斷

    ExecutionException - 如果計算丟擲異常

 

get

public V get(long timeout,
             TimeUnit unit)
      throws InterruptedException,
             ExecutionException,
             TimeoutException

    從介面 Future 複製的描述

        如有必要,最多等待為使計算完成所給定的時間之後,獲取其結果(如果結果可用)。

    指定者:

        介面 Future<V> 中的 get

    引數:

    timeout - 等待的最大時間

    unit - timeout 引數的時間單位

    返回:

        計算的結果

    丟擲:

    CancellationException - 如果計算被取消

    InterruptedException - 如果當前的執行緒在等待時被中斷

    ExecutionException - 如果計算丟擲異常

    TimeoutException - 如果等待超時

 

 

done

protected void done()

    當此任務轉換到狀態 isDone(不管是正常地還是通過取消)時,呼叫受保護的方法。預設實現不執行任何操作。子類可以重寫此方法,以呼叫完成回撥或執行簿記。注意,可以查詢此方法的實現內的狀態,從而確定是否已取消了此任務。

 

 

set

protected void set(V v)

    除非已經設定了此 Future 或已將其取消,否則將其結果設定為給定的值。在計算成功完成時通過 run 方法內部呼叫此方法。

    引數:

    v - 值

 

setException

protected void setException(Throwable t)

    除非已經設定了此 Future 或已將其取消,否則它將報告一個 ExecutionException,並將給定的 throwable 作為其原因。在計算失敗時通過 run 方法內部呼叫此方法。

    引數:

    t - 失敗的原因

 

 

run

public void run()

    除非已將此 Future 取消,否則將其設定為其計算的結果。

    指定者:

        介面 Runnable 中的 run

    指定者:

        介面 RunnableFuture<V> 中的 run

    另請參見:

    Thread.run()

 

runAndReset

protected boolean runAndReset()

    執行計算而不設定其結果,然後將此 Future 重置為初始狀態,如果計算遇到異常或已取消,則該操作失敗。本操作被設計用於那些本質上要執行多次的任務。

    返回:

        如果成功執行並重置,則返回 true。