1. 程式人生 > >Future、FutureTask實現原理淺析

Future、FutureTask實現原理淺析

前言

最近一直在看JUC下面的一些東西,發現很多東西都是以前用過,但是真是到原理層面自己還是很欠缺。
剛好趁這段時間不太忙,回來了便一點點學習總結。
由於自己水平有限,可能存在大量漏洞和思考不周到的地方,不吝賜教。
另外本文章首發本人部落格園: 部落格園地址

Future 模式

一種非常經典的設計模式,這種設計模式主要就利用空間換時間得到概念,也就是說非同步執行(需要開啟一個新的執行緒)。
在網際網路高併發的應用服務中,我們隨處可見這種理念和程式碼,主要就是使用了這種模式。
Future模式非常適合在處理耗時很長的業務邏輯時進行使用,可以有效的減小系統的響應時間,提高系統的吞吐量。

Future程式碼示例:

/**
 * @Description:
 * @Author: wangmeng
 * @Date: 2018/12/17-20:44
 */
public class UseFuture implements Callable<String> {

    private String param;

    public UseFuture(String param) {
        this.param = param;
    }

    @Override
    public String call() throws Exception {
//模擬執行業務邏輯的耗時 TimeUnit.SECONDS.sleep(3); String result = this.param + " 處理完成!"; return result; } public static void main(String[] args) throws Exception{ String queryStr = "query1"; String queryStr2 = "query2"; FutureTask<String> future1 =
new FutureTask<String>(new UseFuture(queryStr)); FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr2)); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(future1);//非同步操作 executorService.submit(future2);//非同步操作 System.out.println("執行中..."); TimeUnit.SECONDS.sleep(2);//處理其他相關的任務。 String result1 = future1.get(); String result2 = future2.get(); System.out.println("資料處理完成。。" + result1); System.out.println("資料處理完成。。" + result2); } }

####應用場景
Future模式有點類似於商品訂單,比如在網購時,當看中某一件商品時,就可以提交訂單,當訂單處理完成後,在家等待商品送貨上門即可。
或者說更形象的我們傳送Ajax請求的時候,頁面是非同步的進行後臺處理,使用者無需一直等待請求的結果,可以繼續瀏覽或操作其他內容。
流程圖

Future實現原理

看到上面示例程式碼,我們是通過executorService.submit(future1) 來提交執行緒的,進一步看看裡面具體的邏輯。

1、 AbstractExecutorService 中submit()原始碼:
submit

2、FutureTask中run()原始碼:

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

這個是核心程式碼,首先我們需要知道FutureTask中有一個volatile state全域性變數,通過這個值來界定任務是否已經執行完畢。

state
將上面run方法一點點拆解如下:
run方法
先判斷state狀態,如果不是NEW說明執行完畢,直接return掉。
後面使用CAS操作,判斷這個任務是否已經執行,這裡FutureTask有個全域性的volatile runner欄位,這裡通過cas將當前執行緒指定給runner。
這裡可以防止callable被執行多次。
接著往下看:
run方法2

檢視set方法具體實現:
set方法

繼續往下跟,檢視finishCompletion方法:
FutureTask中有一個WaiteNode單鏈表,當執行futureTask.get()方法時,多個執行緒會將等待的執行緒的next指向下一個想要get獲取結果的執行緒。
finishCompletion主要就是使用Unsafe.unpark()進行喚醒操作。
finish

3,FutureTask.get() 原始碼
get() 方法會進行自旋操作等待,直到FutureTask中的state狀態大於NORMAL(表示自行完成),然後才會通過FutureTask的outcome獲取返回值。
get
接著往下跟awaitDone方法:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

還是老樣子,一點點分析:
await
await2

不知道自己理解的是否有偏差,有問題歡迎大家隨時指出,感謝備至。
到了這裡 就不再講解了,後面還有report、cancel等方法,大家可以自行參閱原始碼。

總結

結合上述分析可得 FutureTask 執行活動圖如下:
流程圖
同時也可以看出,在 FutureTask 中內部維護了一個單向連結串列 waiters , 在執行 get 的時候會向其中新增節點:
waiters

最後特別感謝掘金此位博主的分享,參考如下:
FutureTask 原始碼分析