1. 程式人生 > >Fork-Join 原理深入分析(二)

Fork-Join 原理深入分析(二)

框架 ryu 循環 app bject ber setname 索引 skip

??本文是將 Fork-Join 復雜且較為龐大的框架分成5個小點來分析 Fork-Join 框架的實現原理,一個個點地理解透 Fork-Join 的核心原理。

1. Frok-Join 框架的核心類的結構分析

??Fork-Join 框架有三個核心類:ForkJoinPool,ForkJoinWorkerThread,ForkJoinTask。下面將分析這三個類的數據結構,初步了解三個類的核心成員。

ForkJoinPool

//繼承了 AbstractExecutorService 類
public class ForkJoinPool extends AbstractExecutorService{

    //任務隊列數組,存儲了所有任務隊列,包括 內部隊列 和 外部隊列
volatile WorkQueue[] workQueues; // main registry //一個靜態常量,ForkJoinPool 提供的內部公用的線程池 static final ForkJoinPool common; //默認的線程工廠類 public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; }

ForkJoinWorkerThread

//繼承了 Thread 類
public class ForkJoinWorkerThread extends
Thread { //線程工作的線程池,即此線程所屬的線程池 final ForkJoinPool pool; // 線程的內部隊列 final ForkJoinPool.WorkQueue workQueue; //..... }

2. ForkJoinPool 中線程的創建

2.1 默認的線程工廠類

ForkJoinPool 中的線程是由默認的線程工廠類 defaultForkJoinWorkerThreadFactory 創建的

//默認的工廠類
  public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;

defaultForkJoinWorkerThreadFactory =
            new
DefaultForkJoinWorkerThreadFactory();

defaultForkJoinWorkerThreadFactory 創建線程的方法 newThread(),其實就是傳入當前的線程池,直接創建。

 /**
     * Default ForkJoinWorkerThreadFactory implementation; creates a
     * new ForkJoinWorkerThread.
     */
    static final class DefaultForkJoinWorkerThreadFactory
        implements ForkJoinWorkerThreadFactory {
        public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            return new ForkJoinWorkerThread(pool);
        }
    }

2.2 ForkJoinWorkerThread 的構造方法

 protected ForkJoinWorkerThread(ForkJoinPool pool) {
        // Use a placeholder until a useful name can be set in registerWorker
        super("aForkJoinWorkerThread");
        //線程工作的線程池,即創建這個線程的線程池
        this.pool = pool;
        //註冊線程到線程池中,並返回此線程的內部任務隊列
        this.workQueue = pool.registerWorker(this);
    }

創建一個工作線程,最後一步還要註冊到其所屬的線程池中,看下面源碼,註冊的過程可以分為兩步:

  1. 創建一個新的任務隊列
  2. 為此任務隊列分配一個線程池的索引,將任務隊列存儲在線程數組 workQueues 的此索引位置,並返回這個任務隊列,作為線程的內部任務隊列。線程註冊成功。
  final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
        wt.setDaemon(true);               // configure thread
        if ((handler = ueh) != null)
            wt.setUncaughtExceptionHandler(handler);
        //創建一個任務隊列
        WorkQueue w = new WorkQueue(this, wt);
        int i = 0;                             //分配一個線程池的索引
        int mode = config & MODE_MASK;
        int rs = lockRunState();
        try {
            WorkQueue[] ws; int n;                    // skip if no array
            if ((ws = workQueues) != null && (n = ws.length) > 0) {
                int s = indexSeed += SEED_INCREMENT; // unlikely to collide
                int m = n - 1;
                //計算 索引
                i = ((s << 1) | 1) & m;               // odd-numbered indices
 
               if (ws[i] != null) {                   //如果索引沖突
                    int probes = 0;                   // step by approx half n
                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                    while (ws[i = (i + step) & m] != null) {
                        if (++probes >= n) {
                            //擴容:以原來的數組的長度的兩倍來創建一個新的數組,再復制舊數組的內衣
                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                            m = n - 1;
                            probes = 0;
                        }
                    }
                }
                w.hint = s;                           // use as random seed
                w.config = i | mode;
                w.scanState = i;                      // publication fence
                //剛創建的任務隊列加入到線程池的 任務隊列數組中
                ws[i] = w;
            }
        } finally {
            unlockRunState(rs, rs & ~RSLOCK);
        }
        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
        return w;
    }

??對應註冊線程,ForkJoinPool 也提供了一個取消線程註冊的方法 deregisterWorker(),在線程被銷毀的時候調用,此處就不說了。

3. ForkJoinTask的fork()、join()方法

??在上一篇文章中,我們在實現 分治編程時,主要就是調用 ForkJoinTaskfork()join() 方法。fork() 方法用於提交子任務,而 join() 方法則用於等待子任務的完成。而這個過程中,將涉及到 “工作竊取算法”。

3.1 fork( ) 方法提交任務

先來看一下 fork() 方法的源碼

public final ForkJoinTask<V> fork() {
        Thread t;
        //判斷是否是一個 工作線程
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            //加入到內部隊列中
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else//由 common 線程池來執行任務
            ForkJoinPool.common.externalPush(this);
        return this;
    }

??源碼中,fork()方法先判斷當前線程(調用fork()來提交任務的線程)是不是一個 ForkJoinWorkerThread 的工作線程,如果是,則將任務加入到內部隊列中,否則,由 ForkJoinPool 提供的內部公用的線程池 common 線程池 來執行這個任務。

   //ForkJoinPool 提供的內部公用的線程池
    static final ForkJoinPool common;

??順便說一下,根據上面的說法,意味著我們可以在普通線程池中直接調用 fork() 方法來提交任務到一個默認提供的線程池中。這將非常方便。假如,你要在程序中處理大任務,需要分治編程,但你僅僅只處理一次,以後就不會用到,而且任務不算太大,不需要設置特定的參數,那麽你肯定不想為此創建一個線程池,這時默認的提供的線程池將會很有用。

下面是我基於上一篇文章例子改造的,CountTask 類在我上一篇文章中找到

public class Test_34 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 創建一個計算任務,計算 由1加到12
        CountTask countTask2 = new CountTask(1, 12);
        //直接在main線程中調用 fork 來提交任務,
        countTask2.fork();
        //沒有創建線程池,使用commonPool線程池
        System.out.println(countTask2.get());
    }
}

運行結果:

任務過大,切割的任務: 1加到 12的和 執行此任務的線程:ForkJoinPool.commonPool-worker-1
任務過大,切割的任務: 1加到 6的和 執行此任務的線程:ForkJoinPool.commonPool-worker-2
任務過大,切割的任務: 7加到 12的和 執行此任務的線程:ForkJoinPool.commonPool-worker-3
執行計算任務,計算 1到 3的和 ,結果是:6 執行此任務的線程:ForkJoinPool.commonPool-worker-2
執行計算任務,計算 4到 6的和 ,結果是:15 執行此任務的線程:ForkJoinPool.commonPool-worker-1
執行計算任務,計算 7到 9的和 ,結果是:24 執行此任務的線程:ForkJoinPool.commonPool-worker-3
執行計算任務,計算 10到 12的和 ,結果是:33 執行此任務的線程:ForkJoinPool.commonPool-worker-1
78

註意執行任務的線程名稱:commonPool表示執行任務的線程是公用的ForkJoinPooL線程池中的線程,上面的例子中,並沒有創建一個新的ForKJoinPool線程池

3.2 join( ) 等待任務的完成

 public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();//直接返回結果
    }

??重點在 dojoin() 方法,下面追蹤下去

 private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return 
            //如果完成,直接返回s
            (s = status) < 0 ? s : 
            //沒有完成,判斷是不是池中的 ForkJoinWorkerThread 工作線程
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            //如果是池中線程,執行這裏
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            //如果不是池中的線程池,則執行這裏
            externalAwaitDone();
    }

??仔細看上面的註釋。當 dojoin( )方法發現任務沒有完成且當前線程是池中線程時,執行了 tryUnpush( )方法。tryUnpush()方法嘗試去執行此任務:如果要join的任務正好在當前任務隊列的頂端,那麽pop出這個任務,然後調用 doExec() 讓當前線程去執行這個任務。

 final boolean tryUnpush(ForkJoinTask<?> t) {
            ForkJoinTask<?>[] a; int s;
            if ((a = array) != null && (s = top) != base &&
                U.compareAndSwapObject
                (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
                U.putOrderedInt(this, QTOP, s);
                return true;
            }
            return false;
        }
 final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

??如果任務不是處於隊列的頂端,那麽就會執行 awaitJoin( ) 方法。

 /**
     * Helps and/or blocks until the given task is done or timeout.
     *
     * @param w caller
     * @param task the task
     * @param deadline for timed waits, if nonzero
     * @return task status on exit
     */
    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
        int s = 0;
        if (task != null && w != null) {
            ForkJoinTask<?> prevJoin = w.currentJoin;
            U.putOrderedObject(w, QCURRENTJOIN, task);
            CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                (CountedCompleter<?>)task : null;
            for (;;) {
                if ((s = task.status) < 0)//如果任務完成了,跳出死循環
                    break;
                if (cc != null)//當前任務是CountedCompleter類型,則嘗試從任務隊列中獲取當前任務的派生子任務來執行;
                    helpComplete(w, cc, 0);
                else if (w.base == w.top || w.tryRemoveAndExec(task))//如果當前線程的內部隊列為空,或者成功完成了任務,幫助某個線程完成任務。
                    helpStealer(w, task);
                if ((s = task.status) < 0)//任務完成,跳出死循環
                    break;
                long ms, ns;
                if (deadline == 0L)
                    ms = 0L;
                else if ((ns = deadline - System.nanoTime()) <= 0L)
                    break;
                else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                    ms = 1L;
                if (tryCompensate(w)) {
                    task.internalWait(ms);
                    U.getAndAddLong(this, CTL, AC_UNIT);
                }
            }
            U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
        }
        return s;
    }

重點說一下helpStealer。helpStealer的原則是你幫助我執行任務,我也幫你執行任務。

  • 遍歷奇數下標,如果發現隊列對象currentSteal放置的剛好是自己要找的任務,則說明自己的任務被該隊列A的owner線程偷來執行
  • 如果隊列A隊列中有任務,則從隊尾(base)取出執行;
  • 如果發現隊列A隊列為空,則根據它正在join的任務,在拓撲找到相關的隊列B去偷取任務執行。
    在執行的過程中要註意,我們應該完整的把任務完成

還有剩下的幾個比較核心的部分源碼就不再此處分析,提供兩個比較棒的博文:(因為我還有一些疑惑沒解決,以後再補充)

  • jdk1.8-ForkJoin框架剖析
  • Jdk1.7 JUC源碼增量解析(3)-ForkJoin-非ForkJoin任務的執行過程

最後,有興趣的還可以看一下Doug Lea 的寫的Fork-Join 框架的文章
原文:A Java Fork/Join Framework
中文譯文:Fork/Join 框架-設計與實現

參考文獻:

  • PunyGod https://www.jianshu.com/p/f777abb7b251

Fork-Join 原理深入分析(二)