1. 程式人生 > >Java併發程式設計系列之二十 Fork/Join框架

Java併發程式設計系列之二十 Fork/Join框架

                       

Fork/Join框架簡介
Fork/Join框架是Java 7提供的用於並行執行任務的框架。具體是把大任務切分為小任務,再把小任務的結果彙總為大任務的結果。從這兩個單詞的角度分析,Fork是分叉的意思,可以引申為切分,Join是加入的意思,可以引申為合併。Fork的作用是把大任務切分為小任務,Join則是把這些小任務的執行結果進行合併的過程。

以計算1+2+3+4為例,假設閾值是2,那麼Fork會將這個計算任務切分為1+2和3+4兩個計算任務並行執行,Join則把1+2這個計算任務的執行結果,也就是3,和3+4這個計算任務的執行結果,也就是7,進行合併,也就是合併3+7,得到的最終的結果就是10了。

工作竊取演算法
工作竊取演算法是指執行緒從其他任務佇列中竊取任務執行(可能你會很詫異,這個演算法有什麼用。待會你就知道了)。考慮下面這種場景:有一個很大的計算任務,為了減少執行緒的競爭,會將這些大任務切分為小任務並分在不同的佇列等待執行,然後為每個任務佇列建立一個執行緒執行佇列的任務。那麼問題來了,有的執行緒可能很快就執行完了,而其他執行緒還有任務沒執行完,執行完的執行緒與其空閒下來不如幫助其他執行緒執行任務,這樣也能加快執行程序。所以,執行完的空閒執行緒從其他佇列的尾部竊取任務執行,而被竊取任務的執行緒則從佇列的頭部取任務執行(這裡使用了雙端佇列,既不影響被竊取任務的執行過程又能加快執行進度)。

從以上的介紹中,能夠發現工作竊取演算法的優點是充分利用執行緒提高並行執行的進度。當然缺點是在某些情況下仍然存在競爭,比如雙端佇列只有任務需要執行的時候。

Fork/Join框架詳解

使用Fork/Join框架分為兩步:

  • 分割任務:首先需要建立一個ForkJoin任務,執行該類的fork方法可以對任務不斷切割,直到分割的子任務足夠小
  • 合併任務執行結果:子任務執行的結果同一放在一個佇列中,通過啟動一個執行緒從佇列中取執行結果。

下面是計算1+2+3+4為例演示如何使用使用Fork/Join框架:

package com.rhwayfun.concurrency.r0406;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;/** * Created by rhwayfun on 16-4-6. */
public class CountTask extends RecursiveTask<Integer>{    //閾值    private static final int THRESHOLD = 2;    //起始值    private int start;    //結束值    private int end;    public CountTask(int start, int end) {        this.start = start;        this.end = end;    }    @Override    protected Integer compute() {        boolean compute = (end - start) <= THRESHOLD;        int res = 0;        if (compute){            for (int i = start; i <= end; i++){                res += i;            }        }else {            //如果長度大於閾值,則分割為小任務            int mid = (start + end) / 2;            CountTask task1 = new CountTask(start,mid);            CountTask task2 = new CountTask(mid + 1, end);            //計算小任務的值            task1.fork();            task2.fork();            //得到兩個小任務的值            int task1Res = task1.join();            int task2Res = task2.join();            res = task1Res + task2Res;        }        return res;    }    public static void main(String[] args) throws ExecutionException, InterruptedException {        ForkJoinPool pool = new ForkJoinPool();        CountTask task = new CountTask(1,5);        ForkJoinTask<Integer> submit = pool.submit(task);        System.out.println("Final result:" + submit.get());    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

程式碼執行結果為:

 

15

程式碼中使用了FokJoinTask,其與一般任務的區別在於它需要實現compute方法,在方法需要判斷任務是否在閾值區間內,如果不是則需要把任務切分到足夠小,直到能夠進行計算。每個被切分的子任務又會重新進入compute方法,再繼續判斷是否需要繼續切分,如果不需要則直接得到子任務執行的結果,如果需要的話則繼續切分,如此迴圈,直到呼叫join方法得到最終的結果。

可以發現Fork/Join框架的需要把提交給ForkJoinPool,ForkJoinPool由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成,前者負責將存放程式提交給ForkJoinPool的任務,後者則負責執行這些任務。關鍵在於在於fork方法與join方法。先看看fork方法的實現原理:

    public final ForkJoinTask<V> fork() {        Thread t;        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)            ((ForkJoinWorkerThread)t).workQueue.push(this);        else            ForkJoinPool.common.externalPush(this);        return this;    }    //把當前任務放入ForkJoinTask陣列佇列中,然後呼叫signalWork    //方法喚醒或者建立一個新的工作執行緒執行任務    final void push(ForkJoinTask<?> task) {            ForkJoinTask<?>[] a; ForkJoinPool p;            int b = base, s = top, n;            if ((a = array) != null) {    // ignore if queue removed                int m = a.length - 1;     // fenced write for task visibility                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);                U.putOrderedInt(this, QTOP, s + 1);                if ((n = s - b) <= 1) {                    if ((p = pool) != null)                        p.signalWork(p.workQueues, this);                }                else if (n >= m)                    growArray();            }        }
   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

再看看join方法的實現原理:

    //返回已經執行完畢的子任務的結果    public final V join() {        int s;        if ((s = doJoin() & DONE_MASK) != NORMAL)            reportException(s);        return getRawResult();    }
   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

原始碼中主要呼叫了doJoin方法判斷當前任務執行的狀態,任務的狀態共有以下幾種:

    //完成的掩碼    static final int DONE_MASK   = 0xf0000000;      //執行完畢    static final int NORMAL      = 0xf0000000;      //被取消    static final int CANCELLED   = 0xc0000000;      //出現異常    static final int EXCEPTIONAL = 0x80000000;      //訊號    static final int SIGNAL      = 0x00010000;      //訊號掩碼    static final int SMASK       = 0x0000ffff
   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

看看doJoin方法原始碼:

    private int doJoin() {        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;        return (s = status) < 0 ? s :            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?            (w = (wt = (ForkJoinWorkerThread)t).workQueue).            tryUnpush(this) && (s = doExec()) < 0 ? s :            wt.pool.awaitJoin(w, this, 0L) :            externalAwaitDone();    }
   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

首先判斷當前任務的狀態,如果已經執行完畢直接返回任務狀態;如果沒有執行完則從任務陣列中取出任務並執行(原始碼中的doExec方法),然後再判斷任務的狀態,如果順利完成,則設定任務狀態為NORMAL,如果出現異常則記錄該異常並且設定任務的狀態為EXCEPTIONAL。

           

再分享一下我老師大神的人工智慧教程吧。零基礎!通俗易懂!風趣幽默!還帶黃段子!希望你也加入到我們人工智慧的隊伍中來!https://blog.csdn.net/jiangjunshow