1. 程式人生 > >完全理解Java:Fork/Join框架

完全理解Java:Fork/Join框架

前言

Java 1.7 引入了一種新的併發框架—— Fork/Join Framework。

本文的主要目的是介紹 ForkJoinPool 的適用場景,實現原理,以及示例程式碼。

TLDR; 如果覺得文章太長的話,以下就是結論

  • ForkJoinPool 不是為了替代 ExecutorService,而是它的補充,在某些應用場景下效能比 ExecutorService 更好。(見  )
  • ForkJoinPool 主要用於實現“分而治之”的演算法,特別是分治之後遞迴呼叫的函式,例如 quick sort 等。
  • ForkJoinPool 最適合的是計算密集型的任務,如果存在 I/O,執行緒間同步,sleep()
     等會造成執行緒長時間阻塞的情況時,最好配合使用 ManagedBlocker

使用

首先介紹的是大家最關心的 Fork/Join Framework 的使用方法,如果對使用方法已經很熟悉的話,可以跳過這一節,直接閱讀原理

用一個特別簡單的求整數陣列所有元素之和來作為我們現在需要解決的問題吧。

問題

計算1至1000的正整數之和。

解決方法

For-loop

最簡單的,顯然是不使用任何並行程式設計的手段,只用最直白的 for-loop 來實現。下面就是具體的實現程式碼。

不過為了便於橫向對比,也為了讓程式碼更加 Java Style,首先我們先定義一個 interface。

public interface Calculator {
    long sumUp(long[] numbers);
}

這個 interface 非常簡單,只有一個函式 sumUp,就是返回陣列內所有元素的和。

再寫一個 main 方法。

public class Main {
    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
        Calculator calculator = new MyCalculator();
        System.out.println(calculator.sumUp(numbers)); // 列印結果500500
    }
}

接下來就是我們的 Plain Old For-loop Calculator,簡稱 POFLC 的實現了。(這其實是個段子,和主題完全無關,感興趣的請見文末的彩蛋

public class ForLoopCalculator implements Calculator {
    public long sumUp(long[] numbers) {
        long total = 0;
        for (long i : numbers) {
            total += i;
        }
        return total;
    }
}

這段程式碼毫無出奇之處,也就不多解釋了,直接跳入下一節——平行計算。

ExecutorService

在 Java 1.5 引入 ExecutorService 之後,基本上已經不推薦直接建立 Thread 物件,而是統一使用 ExecutorService。畢竟從介面的易用程度上來說 ExecutorService 就遠勝於原始的 Thread,更不用提 java.util.concurrent 提供的數種執行緒池,Future 類,Lock 類等各種便利工具。

使用 ExecutorService 的實現

public class ExecutorServiceCalculator implements Calculator {
    private int parallism;
    private ExecutorService pool;

    public ExecutorServiceCalculator() {
        parallism = Runtime.getRuntime().availableProcessors(); // CPU的核心數
        pool = Executors.newFixedThreadPool(parallism);
    }

    private static class SumTask implements Callable<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        public Long call() throws Exception {
            long total = 0;
            for (int i = from; i <= to; i++) {
                total += numbers[i];
            }
            return total;
        }
    }

    @Override
    public long sumUp(long[] numbers) {
        List<Future<Long>> results = new ArrayList<>();

        // 把任務分解為 n 份,交給 n 個執行緒處理
        int part = numbers.length / parallism;
        for (int i = 0; i < parallism; i++) {
            int from = i * part;
            int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1;
            results.add(pool.submit(new SumTask(numbers, from, to)));
        }

        // 把每個執行緒的結果相加,得到最終結果
        long total = 0L;
        for (Future<Long> f : results) {
            try {
                total += f.get();
            } catch (Exception ignore) {}
        }

        return total;
    }
}

如果對 ExecutorService 不太熟悉的話,推薦閱讀《七天七併發模型》的第二章,對 Java 的多執行緒程式設計基礎講解得比較清晰。當然著名的《Java併發程式設計實戰》也是不可多得的好書。

ForkJoinPool

前面花了點時間講解了 ForkJoinPool 之前的實現方法,主要為了在程式碼的編寫難度上進行一下對比。現在就列出本篇文章的重點——ForkJoinPool 的實現方法。

public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 當需要計算的數字小於6時,直接計算結果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            // 否則,把任務一分為二,遞迴計算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可以使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
    }
}

可以看出,使用了 ForkJoinPool 的實現邏輯全部集中在了 compute() 這個函式裡,僅用了14行就實現了完整的計算過程。特別是,在這段程式碼裡沒有顯式地“把任務分配給執行緒”,只是分解了任務,而把具體的任務到執行緒的對映交給了 ForkJoinPool 來完成。

原理

如果你除了 ForkJoinPool 的用法以外,對 ForkJoinPoll 的原理也感興趣的話,那麼請接著閱讀這一節。在這一節中,我會結合 ForkJoinPool 的作者 Doug Lea 的論文——《A Java Fork/Join Framework》,儘可能通俗地解釋 Fork/Join Framework 的原理。

我一直以為,要理解一樣東西的原理,最好就是自己嘗試著去實現一遍。根據上面的示例程式碼,可以看出 fork() 和 join() 是 Fork/Join Framework “魔法”的關鍵。我們可以根據函式名假設一下 fork() 和 join() 的作用:

  • fork():開啟一個新執行緒(或是重用執行緒池內的空閒執行緒),將任務交給該執行緒處理。
  • join():等待該任務的處理執行緒處理完畢,獲得返回值。

以上模型似乎可以(?)解釋 ForkJoinPool 能夠多執行緒執行的事實,但有一個很明顯的問題

當任務分解得越來越細時,所需要的執行緒數就會越來越多,而且大部分執行緒處於等待狀態。

但是如果我們在上面的示例程式碼加入以下程式碼

System.out.println(pool.getPoolSize());

這會顯示當前執行緒池的大小,在我的機器上這個值是4,也就是說只有4個工作執行緒。甚至即使我們在初始化 pool 時指定所使用的執行緒數為1時,上述程式也沒有任何問題——除了變成了一個序列程式以外。

public ForkJoinCalculator() {
    pool = new ForkJoinPool(1);
}

這個矛盾可以匯出,我們的假設是錯誤的,並不是每個 fork() 都會促成一個新執行緒被建立,而每個 join() 也不是一定會造成執行緒被阻塞。Fork/Join Framework 的實現演算法並不是那麼“顯然”,而是一個更加複雜的演算法——這個演算法的名字就叫做 work stealing 演算法。

work stealing 演算法在 Doung Lea 的論文中有詳細的描述,以下是我在結合 Java 1.8 程式碼的閱讀以後——現有程式碼的實現有一部分相比於論文中的描述發生了變化——得到的相對通俗的解釋:

基本思想

  • ForkJoinPool 的每個工作執行緒都維護著一個工作佇列WorkQueue),這是一個雙端佇列(Deque),裡面存放的物件是任務ForkJoinTask)。
  • 每個工作執行緒在執行中產生新的任務(通常是因為呼叫了 fork())時,會放入工作佇列的隊尾,並且工作執行緒在處理自己的工作佇列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
  • 每個工作執行緒在處理自己的工作佇列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其他工作執行緒的工作佇列),竊取的任務位於其他執行緒的工作佇列的隊首,也就是說工作執行緒在竊取其他工作執行緒的任務時,使用的是 FIFO 方式。
  • 在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,並等待其完成。
  • 在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。

下面來介紹一下關鍵的兩個函式:fork() 和 join() 的實現細節,相比來說 fork() 比 join() 簡單很多,所以先來介紹 fork()

fork

fork() 做的工作只有一件事,既是把任務推入當前工作執行緒的工作佇列裡。可以參看以下的原始碼:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

join

join() 的工作則複雜得多,也是 join() 可以使得執行緒免於被阻塞的原因——不像同名的 Thread.join()

  1. 檢查呼叫 join() 的執行緒是否是 ForkJoinThread 執行緒。如果不是(例如 main 執行緒),則阻塞當前執行緒,等待任務完成。如果是,則不阻塞。
  2. 檢視任務的完成狀態,如果已經完成,直接返回結果。
  3. 如果任務尚未完成,但處於自己的工作佇列內,則完成它。
  4. 如果任務已經被其他的工作執行緒偷走,則竊取這個小偷的工作佇列內的任務(以 FIFO 方式),執行,以期幫助它早日完成欲 join 的任務。
  5. 如果偷走任務的小偷也已經把自己的任務全部做完,正在等待需要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。
  6. 遞迴地執行第5步。

將上述流程畫成序列圖的話就是這個樣子:

以上就是 fork() 和 join() 的原理,這可以解釋 ForkJoinPool 在遞迴過程中的執行邏輯,但還有一個問題

最初的任務是 push 到哪個執行緒的工作佇列裡的?

這就涉及到 submit() 函式的實現方法了

submit

其實除了前面介紹過的每個工作執行緒自己擁有的工作佇列以外,ForkJoinPool 自身也擁有工作佇列,這些工作佇列的作用是用來接收由外部執行緒(非 ForkJoinThread 執行緒)提交過來的任務,而這些工作佇列被稱為 submitting queue 。

submit() 和 fork() 其實沒有本質區別,只是提交物件變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作執行緒”竊取“的物件,因此當其中的任務被一個工作執行緒成功竊取時,就意味著提交的任務真正開始進入執行階段。

總結

在瞭解了 Fork/Join Framework 的工作原理之後,相信很多使用上的注意事項就可以從原理中找到原因。例如:為什麼在 ForkJoinTask 裡最好不要存在 I/O 等會阻塞執行緒的行為?,這個我姑且留作思考題吧 :)

還有一些延伸閱讀的內容,在此僅提及一下:

  1. ForkJoinPool 有一個 Async Mode ,效果是工作執行緒在處理本地任務時也使用 FIFO 順序。這種模式下的 ForkJoinPool 更接近於是一個訊息佇列,而不是用來處理遞迴式的任務。
  2. 在需要阻塞工作執行緒時,可以使用 ManagedBlocker
  3. Java 1.8 新增加的 CompletableFuture 類可以實現類似於 Javascript 的 promise-chain,內部就是使用 ForkJoinPool 來實現的。

彩蛋

之所以煞有介事地取名為 POFLC,顯然是為了模仿 POJO 。而 POJO —— Plain Old Java Object 這個詞是如何產生的,在 stackoverflow 上有個帖子討論過,摘錄一下就是

I’ve come to the conclusion that people forget about regular Java objects because they haven’t got a fancy name. That’s why, while preparing for a talk in 2000, Rebecca Parsons, Josh Mackenzie, and I gave them one: POJOs (plain old Java objects).

我得出一個結論:人們之所以總是忘記使用標準的 Java 物件是因為缺少一個足夠裝逼的名字(譯註:類似於 Java Bean 這樣的名字)。因此,在準備2000年的演講時,Rebecca Parsons,Josh Mackenzie 和我給他們起了一個名字叫做 POJO (平淡無奇的 Java 物件)。

Tags// Java,

More Reading