1. 程式人生 > >Fork-Join分治編程介紹(一)

Fork-Join分治編程介紹(一)

會有 post 就會 override 大於 uri nco xtend fork

一、Fork-Join 框架介紹

1. 什麽是 Fork-Join 分治編程框架

??Fork/Join框架是Java7提供了的一個用於並行執行任務的框架,是一個把大任務分割成若幹個小任務,最終匯總每個小任務結果後得到大任務結果的框架,這種開發方法也叫 分治編程。分治編程可以極大地利用CPU資源,提高任務執行的效率,也是目前與多線程有關的前沿技術。

2. 分治編程會遇到什麽問題

??分治的原理上面說了,就是切割大任務成小任務來完成。咦,看起來好像也不難實現啊!為什麽專門弄一個新的框架呢?
我們先看一下,在不使用 Fork-Join 框架時,使用普通的線程池是怎麽實現的。

  • 我們往一個線程池提交了一個大任務,規定好任務切割的閥值。
  • 由池中線程(假設是線程A)執行大任務,發現大任務的大小大於閥值,於是切割成兩個子任務,並調用 submit() 提交到線程池,得到返回的子任務的 Future。
  • 線程A就調用 返回的 Future 的 get() 方法阻塞等待子任務的執行結果。
  • 池中的其他線程(除線程A外,線程A被阻塞)執行兩個子任務,然後判斷子任務的大小有沒有超過閥值,如果超過,則按照步驟2繼續切割,否則,才計算並返回結果。

嘿,好像一切都很美好。真的嗎?別忘了, 每一個切割任務的線程(如線程A)都被阻塞了,直到其子任務完成,才能繼續往下運行 。如果任務太大了,需要切割多次,那麽就會有多個線程被阻塞,性能將會急速下降。更糟糕的是,如果你的線程池的線程數量是有上限的,極可能會造成池中所有線程被阻塞,線程池無法執行任務。

@ Example1 普通線程池實現分治時阻塞的問題

來看一個例子,體會一下吧!下面的例子是將 1+2+...+10 的任務 分割成相加的個數不能超過3(即兩端的差不能大於2)的多個子任務。

//普通線程池下實現的分治效果測試
public class CommonThreadPoolTest {
    //固定大小的線程池,池中線程數量為3
    static ExecutorService fixPoolExcutor = Executors.newFixedThreadPool(3);

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //計算 1+2+...+10  的結果
CountTaskCallable task = new CountTaskCallable(1,10); //提交主人翁 Future<Integer> future = fixPoolExcutor.submit(task); System.out.println("計算的結果:"+future.get()); } }
class CountTaskCallable implements Callable<Integer> {

    //設置閥值為2
    private static final int THRESHOLD = 2;
    private int start;
    private int end;

    public CountTaskCallable(int start, int end) {
        super();
        this.start = start;
        this.end = end;
    }

    @Override
    public Integer call() throws Exception {
        int sum = 0;
        //判斷任務的大小是否超過閥值
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            System.out.println("切割的任務:"+start+"加到"+end+"   執行此任務的線程是 "+Thread.currentThread().getName());
            int middle = (start + end) / 2;
   
            CountTaskCallable leftTaskCallable = new CountTaskCallable(start, middle);
            CountTaskCallable rightTaskCallable = new CountTaskCallable(middle + 1, end);
            // 將子任務提交到線程池中
            Future<Integer> leftFuture = CommonThreadPoolTest.fixPoolExcutor.submit(leftTaskCallable);
            Future<Integer> rightFuture = CommonThreadPoolTest.fixPoolExcutor.submit(rightTaskCallable);
            //阻塞等待子任務的執行結果
            int leftResult = leftFuture.get();
            int rightResult = rightFuture.get();
            // 合並子任務的執行結果
            sum = leftResult + rightResult;
            
        }
        return sum;
    }

}

運行結果

切割的任務:1加到10 執行此任務的線程是 pool-1-thread-1
切割的任務:1加到5 執行此任務的線程是 pool-1-thread-2
切割的任務:6加到10 執行此任務的線程是 pool-1-thread-3

??池的線程只有三個,當任務分割了三次後,池中的線程也就都被阻塞了,無法再執行任何任務,一直卡著動不了。

3. 工作竊取算法

??針對上面的問題,Fork-Join 框架使用了 “工作竊取(work-stealing)”算法。工作竊取(work-stealing)算法是指某個線程從其他隊列裏竊取任務來執行。看一下《Java 並發編程的藝術》對工作竊取算法的解釋:

使用工作竊取算法有什麽優勢呢?假如我們需要做一個比較大的任務,我們可以把這個任務分割為若幹互不依賴的子任務,為了減少線程間的競爭,於是把這些子任務分別放到不同的隊列裏,並為每個隊列創建一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,比如A線程負責處理A隊列裏的任務。但是有的線程會先把自己隊列裏的任務幹完,而其他線程對應的隊列裏還有任務等待處理。幹完活的線程與其等著,不如去幫其他線程幹活,於是它就去其他線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

技術分享圖片

Fork-Join 框架使用工作竊取算法對分治編程實現的描述:

??下面是 ForkJoin 框架對分治編程實現的過程的描述,增加對工作竊取算法的理解。在下面的內容提供了一個分治的例子,可結合這部分描述一起看。這裏僅是簡單的描述,如果想深入了解,可參考我下一篇的源碼分析的文章,讓你知其然,也知其所以然。

  • Fork-Join 框架的線程池ForkJoinPool 的任務分為“外部任務” 和 “內部任務”。
  • “外部任務”是放在 ForkJoinPool 的全局隊列裏;
  • ForkJoinPool 池中的每個線程都維護著一個內部隊列,用於存放“內部任務”。
  • 線程切割任務得到的子任務就會作為“內部任務”放到內部隊列中。
  • 當此線程要想要拿到子任務的計算結果時,先判斷子任務沒有完成,如果沒有完成,則再判斷子任務有沒有被其他線程“竊取”,一旦子任務被竊取了則去執行本線程“內部隊列”的其他任務,或者掃描其他的任務隊列,竊取任務,如果子任務沒有被竊取,則由本線程來完成。
  • 最後,當線程完成了其“內部任務”,處於空閑的狀態時,就會去掃描其他的任務隊列,竊取任務,盡可能地

總之,ForkJoin線程在等待一個任務的完成時,要麽自己來完成這個任務,或者在其他線程竊取了這個任務的情況下,去執行其他任務,是不會阻塞等待,從而避免浪費資源,除非是所有任務隊列都為空。

工作竊取算法的優點:

Fork-Join 框架中的工作竊取算法的優點可以總結為以下兩點:

  • 線程是不會因為等待某個子任務的完成或者沒有內部任務要執行而被阻塞等待、掛起,而是會掃描所有的隊列,竊取任務,直到所有隊列都為空時,才會被掛起。 就如上面所說的。
  • Fork-Join 框架在多CPU的環境下,能提供很好的並行性能。在使用普通線程池的情況下,當CPU不再是性能瓶頸時,能並行地運行多個線程,然而卻因為要互斥訪問一個任務隊列而導致性能提高不上去。而 Fork-Join 框架為每個線程為維護著一個內部任務隊列,以及一個全局的任務隊列,而且任務隊列都是雙向隊列,可從首尾兩端來獲取任務,極大地減少了競爭的可能性,提高並行的性能。


二、 Fork-Join 框架的使用介紹

JDK7引入的Fork/Join有三個核心類:

ForkJoinPool: 執行任務的線程池,繼承了 AbstractExecutorService 類。
ForkJoinWorkerThread: 執行任務的工作線程(即 ForkJoinPool 線程池裏的線程)。每個線程都維護著一個內部隊列,用於存放“內部任務”。繼承了 Thread 類。
ForkJoinTask: 一個用於ForkJoinPool的任務抽象類。實現了 Future 接口

因為ForkJoinTask比較復雜,抽象方法比較多,日常使用時一般不會繼承ForkJoinTask來實現自定義的任務,而是繼承ForkJoinTask的兩個子類,實現 compute() 方法:

RecursiveTask: 子任務帶返回結果時使用
RecursiveAction: 子任務不帶返回結果時使用

compute 方法的實現模式一般是:

if 任務足夠小
    直接返回結果
else
    分割成N個子任務
    依次調用每個子任務的fork方法執行子任務
    依次調用每個子任務的join方法合並執行結果

對於Fork/Join框架的原理,Doug Lea的文章:A Java Fork/Join Framework;

@ Example2 分治例子

??下面的例子與 @Exampel1 是一樣的,計算 1+2+....+12 的結果。
使用Fork/Join框架首先要考慮到的是如何分割任務,如果我們希望每個子任務最多執行兩個數的相加,那麽我們設置分割的閾值是2,由於是12個數字相加。同時,觀察執行任務的線程名稱,理解工作竊取算法的實現。

public class CountTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //創建一個計算任務,計算 由1加到12
        CountTask countTask = new CountTask(1, 12);
        Future<Integer> future = forkJoinPool.submit(countTask);
        System.out.println("最終的計算結果:"+future.get());
    }
}
class CountTask extends RecursiveTask<Integer>{

    private static final int THRESHOLD = 2;
    private int start;
    private int end;
    
    
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;
        
        if(canCompute){//任務已經足夠小,可以直接計算,並返回結果
            for(int i = start;i<=end;i++){
                sum += i;
            }
            System.out.println("執行計算任務,計算    "+start+"到 "+end+"的和  ,結果是:"+sum+"   執行此任務的線程:"+Thread.currentThread().getName());

        }else{ //任務過大,需要切割
            System.out.println("任務過大,切割的任務:  "+start+"加到 "+end+"的和       執行此任務的線程:"+Thread.currentThread().getName());
            int middle = (start+end)/2;
            //切割成兩個子任務
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle+1, end);
            //執行子任務
            leftTask.fork();
            rightTask.fork();
            //等待子任務的完成,並獲取執行結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合並子任務
            sum = leftResult+rightResult;
        }
        return sum;
    }
}

運行結果:

任務過大,切割的任務: 1加到 12的和 執行此任務的線程:ForkJoinPool-1-worker-1
任務過大,切割的任務: 7加到 12的和 執行此任務的線程:ForkJoinPool-1-worker-3
任務過大,切割的任務: 1加到 6的和 執行此任務的線程:ForkJoinPool-1-worker-2
執行計算任務,計算 7到 9的和 ,結果是:24 執行此任務的線程:ForkJoinPool-1-worker-3
執行計算任務,計算 1到 3的和 ,結果是:6 執行此任務的線程:ForkJoinPool-1-worker-1
執行計算任務,計算 4到 6的和 ,結果是:15 執行此任務的線程:ForkJoinPool-1-worker-1
執行計算任務,計算 10到 12的和 ,結果是:33 執行此任務的線程:ForkJoinPool-1-worker-3
最終的計算結果:78

??從結果可以看出,提交的計算任務是由線程1執行,線程1進行了第一次切割,切割成兩個子任務 “7加到12“ 和 ”1加到6“,並提交這兩個子任務。然後這兩個任務便被 線程2、線程3 給竊取了。線程1 的內部隊列中已經沒有任務了,這時候,線程2、線程3 也分別進行了一次任務切割並各自提交了兩個子任務,於是線程1也去竊取任務(這裏竊取的都是線程2的子任務)。

如果想深入了解 Fork-Join 框架,可參考我的下一篇文章




參考文獻

  • https://www.cnblogs.com/wanly3643/p/3951659.html
  • 《java並發編程的藝術》

Fork-Join分治編程介紹(一)