1. 程式人生 > >你一定會需要的FutureTask線上程池中應用和原始碼解析

你一定會需要的FutureTask線上程池中應用和原始碼解析

FutureTask 是一個支援取消的非同步處理器,一般線上程池中用於非同步接受callable返回值。

主要實現分三部分:

封裝 Callable,然後放到執行緒池中去非同步執行->run。

獲取結果-> get。

取消任務-> cancel。

接下來主要學習下該模型如何實現。

舉例說明FutureTask線上程池中的應用

// 第一步,定義執行緒池,

ExecutorService executor = new ThreadPoolExecutor(

        minPoolSize,

        maxPollSize,

        keepAliveTime,

        TimeUnit.SECONDS,

        new SynchronousQueue<>());

 

// 第二步,放到執行緒池中執行,返回FutureTask

FutureTask  task = executor.submit(callable);

 

// 第三步,獲取返回值

T data = task.get();

學習FutureTask實現

類屬性

//以下是FutureTask的各種狀態

private volatile int state;

private static final int NEW          = 0;

private static final int COMPLETING   = 1;

private static final int NORMAL       = 2;

private static final int EXCEPTIONAL  = 3;

private static final int CANCELLED    = 4;

private static final int INTERRUPTING = 5;

private static final int INTERRUPTED  = 6;

 

private Callable<V> callable; //執行的任務

private Object outcome; //儲存結果或者異常

private volatile Thread runner;//執行callable的執行緒

private volatile WaitNode waiters; //呼叫get方法等待獲取結果的執行緒棧

 

其中各種狀態存在 最終狀態 status>COMPLETING

1)NEW -> COMPLETING -> NORMAL(有正常結果)

2) NEW -> COMPLETING -> EXCEPTIONAL(結果為異常)

3) NEW -> CANCELLED(無結果)

4) NEW -> INTERRUPTING -> INTERRUPTED(無結果)

類方法

從上面舉例說明開始分析。

run()方法

FutureTask 繼承 Runnable,ExecutorService submit 把提交的任務封裝成 FutureTask 然後放到執行緒池 ThreadPoolExecutor 的 execute 執行。

public void run() {

    //如果不是初始狀態或者cas設定執行執行緒是當前執行緒不成功,直接返回

    if (state != NEW ||

        !UNSAFE.compareAndSwapObject(this, runnerOffset,

                                     null, Thread.currentThread()))

        return;

    try {

        Callable<V> c = callable;

        if (c != null && state == NEW) {

            V result;

            boolean ran;

            try {

              // 執行callable任務 這裡對異常進行了catch

                result = c.call();

                ran = true;

            } catch (Throwable ex) {

                result = null;

                ran = false;

                setException(ex); // 封裝異常到outcome

            }

            if (ran)

                set(result);

        }

    } finally {

        runner = null;

        int s = state;

        // 這裡如果是中斷中,設定成最終狀態

        if (s >= INTERRUPTING)

            handlePossibleCancellationInterrupt(s);

    }

}

以上是 run 方法原始碼實現很簡單,解析如下:

如果不是始狀態或者 cas 設定執行執行緒是當前執行緒不成功,直接返回,防止多個執行緒重複執行。

執行 Callable 的 call(),即提交執行任務(這裡做了catch,會捕獲執行任務的異常封裝到 outcome 中)。

如果成功執行 set 方法,封裝結果。

set 方法

protected void set(V v) {

    //cas方式設定成completing狀態,防止多個執行緒同時處理

    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

        outcome = v; // 封裝結果

        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終設定成normal狀態

 

        finishCompletion();

    }

}

解析如下:

cas方式設定成completing狀態,防止多個執行緒同時處理

封裝結果到outcome,然後設定到最終狀態normal

執行finishCompletion方法。

finishCompletion方法

// state > COMPLETING; 不管異常,中斷,還是執行完成,都需要執行該方法來喚醒呼叫get方法阻塞的執行緒

private void finishCompletion() {

    // assert state > COMPLETING;

    for (WaitNode q; (q = waiters) != null;) {

        // cas 設定waiters為null,防止多個執行緒執行。

        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

            // 迴圈喚醒所有等待結果的執行緒

            for (;;) {

                Thread t = q.thread;

                if (t != null) {

                    q.thread = null;

                    //喚醒執行緒

                    LockSupport.unpark(t);

                }

                WaitNode next = q.next;

                if (next == null)

                    break;

                q.next = null; // unlink to help gc

                q = next;

            }

            break;

        }

    }

   //該方法為空,可以被重寫

    done();

    callable = null;        // to reduce footprint

}

解析如下:

遍歷waiters中的等待節點,並通過 LockSupport 喚醒每一個節點,通知每個執行緒,該任務執行完成(可能是執行完成,也可能 cancel,異常等)。

以上就是執行的過程,接下來分析獲取結果的過程->get。

get 方法

public V get() throws InterruptedException, ExecutionException {

    int s = state;

    if (s <= COMPLETING)

        s = awaitDone(false, 0L);

    return report(s);

}

public V get(long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException {

        if (unit == null)

            throw new NullPointerException();

        int s = state;

        if (s <= COMPLETING &&

            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

            throw new TimeoutException();

        return report(s);

    }

解析如下:

以上兩個方法,原理一樣,其中一個設定超時時間,支援最多阻塞多長時間。

狀態如果小於 COMPLETING,說明還沒到最終狀態,(不管是否是成功、異常、取消)。

呼叫 awaitDone 方法阻塞執行緒,最終呼叫 report 方法返回結果。

awaitDone 方法

private int awaitDone(boolean timed, long nanos)

        throws InterruptedException {

        final long deadline = timed ? System.nanoTime() + nanos : 0L;

        WaitNode q = null;

        boolean queued = false;

        for (;;) {

            //執行緒可中斷,如果當前阻塞獲取結果執行緒執行interrupt()方法,則從佇列中移除該節點,並丟擲中斷異常

            if (Thread.interrupted()) {

                removeWaiter(q);

                throw new InterruptedException();

            }

            int s = state;

            // 如果已經是最終狀態,退出返回

            if (s > COMPLETING) {

                if (q != null)

                    q.thread = null;

                return s;

            }

            //這裡做了個優化,competiting到最終狀態時間很短,通過yield比掛起響應更快。

            else if (s == COMPLETING) // cannot time out yet

                Thread.yield();

            // 初始化該阻塞節點

            else if (q == null)

                q = new WaitNode();

            // cas方式寫到阻塞waiters棧中

            else if (!queued)

                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

                                                     q.next = waiters, q);

            // 這裡做阻塞時間處理。

            else if (timed) {

                nanos = deadline - System.nanoTime();

                if (nanos <= 0L) {

                    removeWaiter(q);

                    return state;

                }

                // 阻塞執行緒,有超時時間

                LockSupport.parkNanos(this, nanos);

            }

            else

                // 阻塞執行緒

                LockSupport.park(this);

        }

    }

解析如下:

整體流程已寫到註解中,整體實現是放在一個死迴圈中,唯一出口,是達到最終狀態。

然後是構建節點元素,並將該節點入棧,同時阻塞當前執行緒等待執行主任務的執行緒喚醒該節點。

report 方法

private V report(int s) throws ExecutionException {

    Object x = outcome;

    if (s == NORMAL)

        return (V)x;

    if (s >= CANCELLED)

        throw new CancellationException();

    throw new ExecutionException((Throwable)x);

}

然後是report方法,如果是正常結束,返回結果,如果不是正常結束(取消,中斷)丟擲異常。

最後分析下取消流程。

cancel 方法

public boolean cancel(boolean mayInterruptIfRunning) {

    if (!(state == NEW &&

          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

        return false;

    try {    // in case call to interrupt throws exception

        if (mayInterruptIfRunning) {

            try {

                Thread t = runner;

                if (t != null)

                    t.interrupt();

            } finally { // final state

                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

            }

        }

    } finally {

        finishCompletion();

    }

    return true;

}

解析如下:

mayInterruptIfRunning引數是是否允許執行中被中斷取消。

根據入參是否為true,CAS設定狀態為INTERRUPTING或CANCELLED,設定成功,繼續第二步,否則直接返回false。

如果允許執行中被中斷取消,呼叫runner.interupt()進行中斷取消,設定狀態為INTERRUPTED。

喚醒所有在get()方法等待的執行緒。

此處有兩種狀態轉換:

如果mayInterruptIfRunning為true:status狀態轉換為 new -> INTERRUPTING->INTERRUPTED。主動去中斷執行執行緒,然後喚醒所有等待結果的執行緒。

如果mayInterruptIfRunning為false:status狀態轉換為 new -> CANCELLED。

不會去中斷執行執行緒,直接喚醒所有等待結果的執行緒,從 awaitDone 方法中可以看到,喚醒等待執行緒後,直接從跳轉回 get 方法,然後把結果返回給獲取結果的執行緒,當然此時的結果是 null。

總結

以上就是 FutureTask 的原始碼簡單解析,實現比較簡單,FutureTask 就是一個實現 Future 模式,支援取消的非同步處理器。

歡迎工作一到五年的Java工程師朋友們加入Java架構開發: 854393687
群內提供免費的Java架構學習資料(裡面有高可用、高併發、高效能及分散式、Jvm效能調優、Spring原始碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!