1. 程式人生 > >jdk之多執行緒Future框架解析

jdk之多執行緒Future框架解析

       其中有錯誤和不足之處請大家提出來,希望和大家一起討論學習和進步;首先來看官方對Future的描述:Future 表示非同步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。

接下來看下Future框架類結構圖(沒有包含全部,當前先研究比較常用的FutureTask)  


      從圖中可以看到FutureTask直接繼承自RunableFuture,RunableFuture分別繼承自Future和Runable;Runable大家都很熟悉,就是一個run方法並沒有返回結果,只管執行;而Future中就是一些獲取執行緒執行狀態和結果的方法了;

      接下來我們結合FutureTask原始碼來解析FutureTask的原理,從上面FutureTask的類圖中可以看出,FutureTask就一個屬性sync,這是一個複雜型別的屬性,其型別是FutureTask的私有內部類Sync,獲取執行緒執行狀態和執行結果的核心邏輯就是在這個類中實現的;先看FutureTask這個實現類,他有兩個含參建構函式

 public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }

public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }
       這兩個建構函式區別在於,如果使用第一個建構函式最後獲取執行緒實行結果就是callable的執行的返回結果;而如果使用第二個建構函式那麼最後獲取執行緒實行結果就是引數中的result,接下來讓我們看一下FutureTask的run方法
public void run() {
        sync.innerRun();
    }
正如上面所提到的,實現是在sync類中,那麼我們就接著看
sync.innerRun()方法
void innerRun() {
            if (!compareAndSetState(0, RUNNING))
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // recheck after setting thread
                    innerSet(callable.call());
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }
        在這裡首先提一下FutureTask中所有的狀態的改變和加鎖、阻塞都是動過AQS資料結構來實現的就是AbstractQueuedSynchronizer,有興趣的可瞭解下;
標識FutureTask執行緒的狀態欄位是AbstractQueuedSynchronizer.Node.state;方法innerRun首先判斷執行緒是否是初始狀態,如果是就將狀態變成執行中狀態,也就是compareAndSetState(0, RUNNING)做的事情;修改狀態之後就是執行執行緒方法,並把結果賦值給Sync.result欄位對應程式碼為 innerSet(callable.call());我們自己實現的執行緒邏輯也就是通過<span style="font-family: Arial, Helvetica, sans-serif;">callable.call()執行的,接下來</span><span style="font-family: Arial, Helvetica, sans-serif;">我們在看一下innerSet方法</span>
 void innerSet(V v) {
	    for (;;) {
		int s = getState();
		if (s == RAN)
		    return;
                if (s == CANCELLED) {
		    // aggressively release to set runner to null,
		    // in case we are racing with a cancel request
		    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
		if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done();
		    return;
                }
            }
        }

這段方法的邏輯就是判斷當前執行緒執行狀態,如果正在執行就把轉態改為執行完畢狀態,並把執行結果賦值給Sync.result,若為終止狀態,就直接返回;ok,接下來我再看看獲取執行緒執行結果get方法

public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }

同樣的邏輯在sync.innerGet()中

  V innerGet() throws InterruptedException, ExecutionException {
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }

該方法會一直阻塞到執行緒執行完畢,執行緒執行中間如被取消獲中斷則丟擲相應異常,其他幾個方法也都類似;整個Future框架FutureTask的實現難點還是在於狀態的改變和加鎖、阻塞的過程也就是AbstractQueuedSynchronizer實現,接下來的一篇就著重介紹AbstractQueuedSynchronizer的原理,敬請期待