1. 程式人生 > >【小家java】Java執行緒池之---ForkJoinPool執行緒池的使用以及原理

【小家java】Java執行緒池之---ForkJoinPool執行緒池的使用以及原理

相關閱讀

【小家java】java5新特性(簡述十大新特性) 重要一躍
【小家java】java6新特性(簡述十大新特性) 雞肋升級
【小家java】java7新特性(簡述八大新特性) 不溫不火
【小家java】java8新特性(簡述十大新特性) 飽受讚譽
【小家java】java9新特性(簡述十大新特性) 褒貶不一
【小家java】java10新特性(簡述十大新特性) 小步迭代
【小家java】java11新特性(簡述八大新特性) 首個重磅LTS版本


【小家java】Java中的執行緒池,你真的用對了嗎?(教你用正確的姿勢使用執行緒池)
小家Java】一次Java執行緒池誤用(newFixedThreadPool)引發的線上血案和總結


【小家java】BlockingQueue阻塞佇列詳解以及5大實現(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue…)
【小家java】用 ThreadPoolExecutor/ThreadPoolTaskExecutor 執行緒池技術提高系統吞吐量(附帶執行緒池引數詳解和使用注意事項)


Java 7 引入了一種新的併發框架—— Fork/Join Framework。同時引入了一種新的執行緒池:ForkJoinPool(ForkJoinPool.coomonPool)

@sun.misc.Contended
public class
ForkJoinPool extends AbstractExecutorService { }

本文的主要目的是介紹 ForkJoinPool 的適用場景,實現原理,以及示例程式碼。

說在前面

可以說是說明,也可以說下面是結論:

  1. ForkJoinPool 不是為了替代 ExecutorService,而是它的補充,在某些應用場景下效能比 ExecutorService 更好。
  2. ForkJoinPool 主要用於實現“分而治之”的演算法,特別是分治之後遞迴呼叫的函式,例如 quick sort 等。
  3. ForkJoinPool 最適合的是計算密集型的任務,如果存在 I/O,執行緒間同步,sleep() 等會造成執行緒長時間阻塞的情況時,最好配合使用 ManagedBlocker。

使用

首先介紹的是大家最關心的 Fork/Join Framework 的使用方法,用一個特別簡單的求整數陣列所有元素之和來作為我們現在需要解決的問題吧。

問題:計算1至10000000的正整數之和。
  • 方案一:最為普通的for迴圈解決
    最簡單的,顯然是不使用任何並行程式設計的手段,只用最直白的 for-loop 來實現。下面就是具體的實現程式碼。

為了面向介面程式設計,下面我們把計算的方法定義成介面,不同的方案書寫不同的實現即可

/**
 * @author [email protected]
 * @description //
 * @date 2018/11/5 14:26
 */
public interface Calculator {

    /**
     * 把傳進來的所有numbers 做求和處理
     *
     * @param numbers
     * @return 總和
     */
    long sumUp(long[] numbers);
}

寫一個通過for loop的實現。這段程式碼毫無出奇之處,也就不多解釋了

/**
 * 通過普通的for迴圈 實現總和的相加 邏輯非常簡單
 *  * @author [email protected]
 * @description //
 * @date 2018/11/5 14:31
 */
public class ForLoopCalculator implements Calculator {

	@Override
    public long sumUp(long[] numbers) {
        long total = 0;
        for (long i : numbers) {
            total += i;
        }
        return total;
    }
}

寫一個main方法進行測試:

    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();

        Instant start = Instant.now();
        Calculator calculator = new ForLoopCalculator();
        long result = calculator.sumUp(numbers);
        Instant end = Instant.now();
        System.out.println("耗時:" + Duration.between(start, end).toMillis() + "ms");

        System.out.println("結果為:" + result); 
    }
輸出:
耗時:10ms
結果為:50000005000000
  • 方案二:ExecutorService多執行緒方式實現
    在 Java 1.5 引入 ExecutorService 之後,基本上已經不推薦直接建立 Thread 物件,而是統一使用 ExecutorService。畢竟從介面的易用程度上來說 ExecutorService 就遠勝於原始的 Thread,更不用提 java.util.concurrent 提供的數種執行緒池,Future 類,Lock 類等各種便利工具。

由於上面是面向介面的設計,因此我們只需要加一個使用 ExecutorService 的實現類:

/**
 * 使用ExecutorService實現多執行緒的求和
 *  * @author [email protected]
 * @description //
 * @date 2018/11/5 14:45
 */
public class ExecutorServiceCalculator implements Calculator {

    private int parallism;
    private ExecutorService pool;

    public ExecutorServiceCalculator() {
        parallism = Runtime.getRuntime().availableProcessors(); // CPU的核心數 預設就用cpu核心數了
        pool = Executors.newFixedThreadPool(parallism);
    }

    //處理計算任務的執行緒
    private static class SumTask implements Callable<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        public Long call() {
            long total = 0;
            for (int i = from; i <= to; i++) {
                total += numbers[i];
            }
            return total;
        }
    }


    @Override
    public long sumUp(long[] numbers) {
        List<Future<Long>> results = new ArrayList<>();

        // 把任務分解為 n 份,交給 n 個執行緒處理   4核心 就等分成4份唄
        // 然後把每一份都扔個一個SumTask執行緒 進行處理
        int part = numbers.length / parallism;
        for (int i = 0; i < parallism; i++) {
            int from = i * part; //開始位置
            int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1; //結束位置

            //扔給執行緒池計算
            results.add(pool.submit(new SumTask(numbers, from, to)));
        }

        // 把每個執行緒的結果相加,得到最終結果 get()方法 是阻塞的
        // 優化方案:可以採用CompletableFuture來優化  JDK1.8的新特性
        long total = 0L;
        for (Future<Long> f : results) {
            try {
                total += f.get();
            } catch (Exception ignore) {
            }
        }

        return total;
    }
}

main方法改為:

    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();

        Instant start = Instant.now();
        Calculator calculator = new ExecutorServiceCalculator();
        long result = calculator.sumUp(numbers);
        Instant end = Instant.now();
        System.out.println("耗時:" + Duration.between(start, end).toMillis() + "ms");

        System.out.println("結果為:" + result); // 列印結果500500
    }
輸出:
耗時:30ms
結果為:50000005000000
  • 方案三:採用ForkJoinPool(Fork/Join)
    前面花了點時間講解了 ForkJoinPool 之前的實現方法,主要為了在程式碼的編寫難度上進行一下對比。現在就列出本篇文章的重點——ForkJoinPool 的實現方法。
/**
 * 採用ForkJoin來計算求和
 *  * @author [email protected]
 * @description //
 * @date 2018/11/5 15:09
 */
public class ForkJoinCalculator implements Calculator {

    private ForkJoinPool pool;

    //執行任務RecursiveTask:有返回值  RecursiveAction:無返回值
    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        //此方法為ForkJoin的核心方法:對任務進行拆分  拆分的好壞決定了效率的高低
        @Override
        protected Long compute() {

            // 當需要計算的數字個數小於6時,直接採用for loop方式計算結果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            } else { // 否則,把任務一分為二,遞迴拆分(注意此處有遞迴)到底拆分成多少分 需要根據具體情況而定
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可以使用公用的執行緒池 ForkJoinPool.commonPool():
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
        pool.shutdown();
        return result;
    }
}
輸出:
耗時:390ms
結果為:50000005000000

可以看出,使用了 ForkJoinPool 的實現邏輯全部集中在了 compute() 這個函式裡,僅用了14行就實現了完整的計算過程。特別是,在這段程式碼裡沒有顯式地“把任務分配給執行緒”,只是分解了任務,而把具體的任務到執行緒的對映交給了 ForkJoinPool 來完成。

  • 方案四:採用並行流(JDK8以後的推薦做法)
    public static void main(String[] args) {

        Instant start = Instant.now();
        long result = LongStream.rangeClosed(0, 10000000L).parallel().reduce(0, Long::sum);
        Instant end = Instant.now();
        System.out.println("耗時:" + Duration.between(start, end).toMillis() + "ms");

        System.out.println("結果為:" + result); // 列印結果500500

    }
輸出:
耗時:130ms
結果為:50000005000000

並行流底層還是Fork/Join框架,只是任務拆分優化得很好。

耗時效率方面解釋:Fork/Join 並行流等當計算的數字非常大的時候,優勢才能體現出來。也就是說,如果你的計算比較小,或者不是CPU密集型的任務,不太建議使用並行處理

原理

**我一直以為,要理解一樣東西的原理,最好就是自己嘗試著去實現一遍。**根據上面的示例程式碼,可以看出 fork() 和 join() 是 Fork/Join Framework “魔法”的關鍵。我們可以根據函式名假設一下 fork() 和 join() 的作用:

  • fork():開啟一個新執行緒(或是重用執行緒池內的空閒執行緒),將任務交給該執行緒處理。
  • join():等待該任務的處理執行緒處理完畢,獲得返回值。

疑問:當任務分解得越來越細時,所需要的執行緒數就會越來越多,而且大部分執行緒處於等待狀態?

但是如果我們在上面的示例程式碼加入以下程式碼

System.out.println(pool.getPoolSize());

這會顯示當前執行緒池的大小,在我的機器上這個值是4,也就是說只有4個工作執行緒。甚至即使我們在初始化 pool 時指定所使用的執行緒數為1時,上述程式也沒有任何問題——除了變成了一個序列程式以外。

public ForkJoinCalculator() {
    pool = new ForkJoinPool(1);
}

這個矛盾可以匯出,我們的假設是錯誤的,並不是每個 fork() 都會促成一個新執行緒被建立,而每個 join() 也不是一定會造成執行緒被阻塞。Fork/Join Framework 的實現演算法並不是那麼“顯然”,而是一個更加複雜的演算法——這個演算法的名字就叫做work stealing 演算法。
在這裡插入圖片描述

  1. ForkJoinPool 的每個工作執行緒都維護著一個工作佇列(WorkQueue),這是一個雙端佇列(Deque),裡面存放的物件是任務(ForkJoinTask)。
  2. 每個工作執行緒在執行中產生新的任務(通常是因為呼叫了 fork())時,會放入工作佇列的隊尾,並且工作執行緒在處理自己的工作佇列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行
  3. 每個工作執行緒在處理自己的工作佇列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其他工作執行緒的工作佇列),竊取的任務位於其他執行緒的工作佇列的隊首,也就是說工作執行緒在竊取其他工作執行緒的任務時,使用的是 FIFO 方式。
  4. 在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,並等待其完成。
  5. 在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。

至於Fork和Join原始碼級別的的細節,本文不做過多描述了~~

submit() 和 fork() 其實沒有本質區別,只是提交物件變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作執行緒”竊取“的物件,因此當其中的任務被一個工作執行緒成功竊取時,就意味著提交的任務真正開始進入執行階段。

ForkJoinPool的commonPool相關引數配置

commonPool是ForkJoinPool內建的一個執行緒池物件,JDK8裡有些都是使用它的。他怎麼來的呢?具體原始碼為ForkJoinPool的靜態方法:makeCommonPool

   private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = defaultForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }
引數解釋 以及自定義commonPool的引數

通過程式碼指定,必須得在commonPool初始化之前(parallel的stream被呼叫之前,一般可在系統啟動後設置)注入進去,否則無法生效。
通過啟動引數指定無此限制,較為安全

  • parallelism(即配置執行緒池個數)
    可以通過java.util.concurrent.ForkJoinPool.common.parallelism進行配置,最大值不能超過MAX_CAP,即32767.
static final int MAX_CAP = 0x7fff; //32767

如果沒有指定,則預設為Runtime.getRuntime().availableProcessors() - 1.
自定義:程式碼指定(必須得在commonPool初始化之前注入進去,否則無法生效)

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
// 或者啟動引數指定
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
  • threadFactory:預設為defaultForkJoinWorkerThreadFactory,沒有securityManager的話。
  • exceptionHandler:如果沒有設定,預設為null
  • WorkQueue:控制是FIFO還是LIFO
    ForkJoinPool 的每個工作執行緒都維護著一個工作佇列(WorkQueue),這是一個雙端佇列(Deque),裡面存放的物件是任務(ForkJoinTask)。
    每個工作執行緒在執行中產生新的任務(通常是因為呼叫了 fork())時,會放入工作佇列的隊尾,並且工作執行緒在處理自己的工作佇列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
    每個工作執行緒在處理自己的工作佇列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool的任務,或是來自於其他工作執行緒的工作佇列),竊取的任務位於其他執行緒的工作佇列的隊首,也就是說工作執行緒在竊取其他工作執行緒的任務時,使用的是 FIFO 方式。
  • queue capacity:佇列容量

繼續介紹

建立了ForkJoinPool例項之後,就可以呼叫ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法來執行指定任務了。
其中ForkJoinTask代表一個可以並行、合併的任務。ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask。
其中RecusiveTask代表有返回值的任務,
而RecusiveAction代表沒有返回值的任務。

在這裡插入圖片描述

它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService介面。它使用了一個無限佇列來儲存需要執行的任務,而執行緒的數量則是通過建構函式傳入,如果沒有向建構函式中傳入希望的執行緒數量,那麼當前計算機可用的CPU數量會被設定為執行緒數量作為預設值。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序演算法。

這裡的要點在於,ForkJoinPool需要使用相對少的執行緒來處理大量的任務。

比如要對1000萬個資料進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬資料的合併任務。以此類推,對於500萬的資料也會做出同樣的分割處理,到最後會設定一個閾值來規定當資料規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序

那麼到最後,所有的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行。
所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的執行緒無法像任務佇列中再新增一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的執行緒建立新的任務,並掛起當前的任務,此時執行緒就能夠從佇列中選擇子任務執行。

使用ThreadPoolExecutor或者ForkJoinPool,會有什麼效能的差異呢?

使用ForkJoinPool能夠使用數量有限的執行緒來完成非常多的具有父子關係的任務,比如使用4個執行緒來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關係的任務時,也需要200萬個執行緒,顯然這是不可行的。

這就是工作竊取模式的優點

總結

在瞭解了 Fork/Join Framework 的工作原理之後,相信很多使用上的注意事項就可以從原理中找到原因。例如:為什麼在 ForkJoinTask 裡最好不要存在 I/O 等會阻塞執行緒的行為?,這個各位讀者可以思考思考了

還有一些延伸閱讀的內容,在此僅提及一下:

  1. ForkJoinPool 有一個 Async Mode ,效果是工作執行緒在處理本地任務時也使用 FIFO 順序。這種模式下的 ForkJoinPool 更接近於是一個訊息佇列,而不是用來處理遞迴式的任務。
  2. 在需要阻塞工作執行緒時,可以使用 ManagedBlocker。
  3. Java 1.8 新增加的 CompletableFuture 類可以實現類似於 Javascript 的 promise-chain,內部就是使用 ForkJoinPool 來實現的。

彩蛋

人們之所以總是忘記使用標準的 Java 物件是因為缺少一個足夠裝逼的名字(譯註:類似於 Java Bean 這樣的名字)。因此,在準備2000年的演講時,Rebecca Parsons,Josh Mackenzie 和我給他們起了一個名字叫做 POJO (平淡無奇的 Java 物件)。

有些東西,有些技術如果沒有一個夠裝逼的名字,也是很難被人們記住的。。。吼吼吼