1. 程式人生 > >執行緒及同步的效能 – 執行緒池/ ThreadPoolExecutors/ ForkJoinPool

執行緒及同步的效能 – 執行緒池/ ThreadPoolExecutors/ ForkJoinPool

執行緒池和ThreadPoolExecutors

雖然在程式中可以直接使用Thread型別來進行執行緒操作,但是更多的情況是使用執行緒池,尤其是在Java EE應用伺服器中,一般會使用若干個執行緒池來處理來自客戶端的請求。Java中對於執行緒池的支援,來自ThreadPoolExecutor。一些應用伺服器也確實是使用的ThreadPoolExecutor來實現執行緒池。

對於執行緒池的效能調優,最重要的引數就是執行緒池的大小。

對於任何執行緒池而言,它們的工作方式幾乎都是相同的:

  • 任務被投放到一個佇列中(佇列的數量不定)
  • 執行緒從佇列中取得任務並執行
  • 執行緒完成任務後,繼續嘗試從佇列中取得任務,如果佇列為空,那麼執行緒進入等待狀態

執行緒池往往擁有最小和最大執行緒數:

  • 最小執行緒數,即當任務佇列為空時,執行緒池中最少需要保持的執行緒數量,這樣做是考慮到建立執行緒是一個相對耗費資源的操作,應當儘可能地避免,當有新任務被投入佇列時,總會有執行緒能夠立即對它進行處理。
  • 最大執行緒數,當需要處理的任務過多時,執行緒池能夠擁有的最大執行緒數。這樣是為了保證不會有過多的執行緒被創建出來,因為執行緒的執行需要依賴於CPU資源和其它各種資源,當執行緒過多時,反而會降低效能。

在ThreadPoolExecutor和其相關的型別中,最小執行緒數被稱為執行緒池核心規模(Core Pool Size),在其它Java應用伺服器的實現中,這個數量也許被稱為最小執行緒數(MinThreads),但是它們的概念是相同的。

但是在對執行緒池進行規模變更(Resizing)的時候,ThreadPoolExecutor和其它執行緒池的實現也許存在的很大的差別。

一個最簡單的情況是:當有新任務需要被執行,且當前所有的執行緒都被佔用時,ThreadPoolExecutor和其它實現通常都會新建立一個執行緒來執行這個新任務(直到達到了最大執行緒數)。

設定最大執行緒數

最合適的最大執行緒數該怎麼確定,依賴以下兩個方面:

  • 任務的特徵
  • 計算機的硬體情況

為了方便討論,下面假設JVM有4個可用的CPU。那麼任務也很明確,就是要最大程度地“壓榨”它們的資源,千方百計的提高CPU的利用率。

那麼,最大執行緒數最少需要被設定成4,因為有4個可用的CPU,意味著最多能夠並行地執行4個任務。當然,垃圾回收(Garbage Collection)在這個過程中也會造成一些影響,但是它們往往不需要使用整個CPU。一個例外是,當使用了CMS或者G1垃圾回收演算法時,需要有足夠的CPU資源進行垃圾回收。

那麼是否有必要將執行緒數量設定的更大呢?這就取決於任務的特徵了。

假設當任務是計算密集型的,意味著任務不需要執行IO操作,例如讀取資料庫,讀取檔案等,因此它們不涉及到同步的問題,任務之間完全是獨立的。比如使用一個批處理程式讀取Mock資料來源的資料,測試在不執行緒池擁有不同執行緒數量時的效能,得到下表:

執行緒數 執行時間(秒) 基線百分比
1 255.6 100%
2 134.8 52.7%
4 77.0 30.1%
8 81.7 31.9%
16 85.6 33.5%

從上面中得到一些結論:

  • 當執行緒數為4時,達到最優效能,再增加執行緒數量時並沒有更好的效能,因為此時CPU的利用率已經達到了最高,在增加執行緒只會增加執行緒之間爭奪CPU資源的行為,因此反而降低了效能。
  • 即使在CPU利用率達到最高時,基線百分比也不是理想中的25%,這是因為雖然在程式執行過程中,CPU資源並不是只被應用程式執行緒獨享的,一些後臺執行緒有時也會需要CPU資源,比如GC執行緒和系統的一些執行緒等。

當計算是通過Servlet觸發的時候,效能資料是下面這個樣子的(Load Generator會同時傳送20個請求):

執行緒數 每秒運算元(OPS) 基線百分比
4 77.43 100%
8 75.93 98.8%
16 71.65 92.5%
32 69.34 89.5%
64 60.44 78.1%

從上表中可以得到的結論:

  • 線上程數量為4時,效能最優。因為此任務的型別是計算密集型的,只有4個CPU,因此執行緒數量為4時,達到最優情況。
  • 隨著執行緒數量逐漸增加,效能下降,因為執行緒之間會互相爭奪CPU資源,造成頻繁切換執行緒執行上下文環境,而這些切換隻會浪費CPU資源。
  • 效能下降的速度並不明顯,這也是因為任務型別是計算密集型的緣故,如果效能瓶頸不是CPU提供的計算資源,而是外部的資源,如資料庫,檔案操作等,那麼增加執行緒數量帶來的效能下降也許會更加明顯。

下面,從Client的角度考慮一下問題,併發Client的數量對於Server的響應時間會有什麼影響呢?還是同樣地環境,當併發Client數量逐漸增加時,響應時間會如下發生變化:

併發Client執行緒數 平均響應時間(秒) 基線百分比
1 0.05 100%
2 0.05 100%
4 0.05 100%
6 0.076 152%
8 0.104 208%
16 0.212 424%
32 0.437 874%
64 0.909 1818%

因為任務型別是計算密集型的,當併發Client數量時1,2,4時,平均響應時間都是最優的,然而當出現多餘4個Client時,效能會隨著Client的增加發生顯著地下降。

當Client數量增加時,你也許會想通過增加服務端執行緒池的執行緒數量來提高效能,可是在CPU密集型任務的情況下,這麼做只會降低效能。因為系統的瓶頸就是CPU資源,冒然增加執行緒池的執行緒數量只會讓對於這種資源的競爭更加激烈。

所以,在面對效能方面的問題時。第一步永遠是瞭解系統的瓶頸在哪裡,這樣才能夠有的放矢。如果冒然進行所謂的“調優”,讓對瓶頸資源的競爭更加激烈,那麼帶來的只會是效能的進一步下降。相反,如果讓對瓶頸資源的競爭變的緩和,那麼效能通常則會提高。

在上面的場景中,如果從ThreadPoolExecutor的角度進行考慮,那麼在任務佇列中一直會有任務處於掛起(Pending)的狀態(因為Client的每個請求對應的就是一個任務),而所有的可用執行緒都在工作,CPU正在滿負荷運轉。這個時候新增執行緒池的執行緒數量,讓這些新增的執行緒領取一些掛起的任務,會發生什麼事情呢?這時帶來的只會是執行緒之間對於CPU資源的爭奪更加激烈,降低了效能。

設定最小執行緒數

設定了最大執行緒數之後,還需要設定最小執行緒數。對於絕大部分場景,將它設定的和最大執行緒數相等就可以了。

將最小執行緒數設定的小於最大執行緒數的初衷是為了節省資源,因為每多建立一個執行緒都會耗費一定量的資源,尤其是執行緒棧所需要的資源。但是在一個系統中,針對硬體資源以及任務特點選定了最大執行緒數之後,就表示這個系統總是會利用這些執行緒的,那麼還不如在一開始就讓執行緒池把需要的執行緒準備好。然而,把最小執行緒數設定的小於最大執行緒數所帶來的影響也是非常小的,一般都不會察覺到有什麼不同。

在批處理程式中,最小執行緒數是否等於最大執行緒數並不重要。因為最後執行緒總是需要被創建出來的,所以程式的執行時間應該幾乎相同。對於伺服器程式而言,影響也不大,但是一般而言,執行緒池中的執行緒在“熱身”階段就應該被創建出來,所以這也是為什麼建議將最小執行緒數設定的等於最大執行緒數的原因。

在一些場景中,也需要要設定一個不同的最小執行緒數。比如當一個系統最大需要同時處理2000個任務,而平均任務數量只是20個情況下,就需要將最小執行緒數設定成20,而不是等於其最大執行緒數2000。此時如果還是將最小執行緒數設定的等於最大執行緒數的話,那麼閒置執行緒(Idle Thread)佔用的資源就比較可觀了,尤其是當使用了ThreadLocal型別的變數時。

執行緒池任務數量(Thread Pool Task Sizes)

執行緒池有一個列表或者佇列的資料結構來存放需要被執行的任務。顯然,在某些情況下,任務數量的增長速度會大於其被執行的速度。如果這個任務代表的是一個來自Client的請求,那麼也就意味著該Client會等待比較長的時間。顯然這是不可接受的,尤其對於提供Web服務的伺服器程式而言。

所以,執行緒池會有機制來限制列表/佇列中任務的數量。但是,和設定最大執行緒數一樣,並沒有一個放之四海而皆準的最優任務數量。這還是要取決於具體的任務型別和不斷的進行效能測試。

對於ThreadPoolExecutor而言,當任務數量達到最大時,再嘗試增加新的任務就會失敗。ThreadPoolExecutor有一個rejectedExecution方法用來拒絕該任務。這會導致應用伺服器返回一個HTTP狀態碼500,當然這種資訊最好以更友好的方式傳達給Client,比如解釋一下為什麼你的請求被拒絕了。

定製ThreadPoolExecutor

執行緒池在同時滿足以下三個條件時,就會建立一個新的執行緒:

  • 有任務需要被執行
  • 當前執行緒池中所有的執行緒都處於工作狀態
  • 當前執行緒池的執行緒數沒有達到最大執行緒數

至於執行緒池會如何建立這個新的執行緒,則是根據任務佇列的種類:

  • 任務佇列是 SynchronousQueue 這個佇列的特點是,它並不能放置任何任務在其佇列中,當有任務被提交時,使用SynchronousQueue的執行緒池會立即為該任務建立一個執行緒(如果執行緒數量沒有達到最大時,如果達到了最大,那麼該任務會被拒絕)。這種佇列適合於當任務數量較小時採用。也就是說,在使用這種佇列時,未被執行的任務沒有一個容器來暫時儲存。
  • 任務佇列是 無限佇列(Unbound Queue) 無界限的佇列可以是諸如LinkedBlockingQueue這種型別,在這種情況下,任何被提交的任務都不會被拒絕。但是執行緒池會忽略最大執行緒數這一引數,意味著執行緒池的最大執行緒數就變成了設定的最小執行緒數。所以在使用這種佇列時,通常會將最大執行緒數設定的和最小執行緒數相等。這就相當於使用了一個固定了執行緒數量的執行緒池。
  • 任務佇列是 有限佇列(Bounded Queue) 當使用的佇列是諸如ArrayBlockingQueue這種有限佇列的時候,來決定什麼時候建立新執行緒的演算法就相對複雜一些了。比如,最小執行緒數是4,最大執行緒數是8,任務佇列最多能夠容納10個任務。在這種情況下,當任務逐漸被新增到佇列中,直到佇列被佔滿(10個任務),此時執行緒池中的工作執行緒仍然只有4個,即最小執行緒數。只有當仍然有任務希望被放置到佇列中的時候,執行緒池才會新建立一個執行緒並從佇列頭部拿走一個任務,以騰出位置來容納這個最新被提交的任務。

關於如何定製ThreadPoolExecutor,遵循KISS原則(Keep It Simple, Stupid)就好了。比如將最大執行緒數和最小執行緒數設定的相等,然後根據情況選擇有限佇列或者無限佇列。

總結

  1. 執行緒池是物件池的一個有用的例子,它能夠節省在建立它們時候的資源開銷。並且執行緒池對系統中的執行緒數量也起到了很好的限制作用。
  2. 執行緒池中的執行緒數量必須仔細的設定,否則冒然增加執行緒數量只會帶來效能的下降。
  3. 在定製ThreadPoolExecutor時,遵循KISS原則,通常情況下會提供最好的效能。

ForkJoinPool

在Java 7中引入了一種新的執行緒池:ForkJoinPool。

它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService介面。它使用了一個無限佇列來儲存需要執行的任務,而執行緒的數量則是通過建構函式傳入,如果沒有向建構函式中傳入希望的執行緒數量,那麼當前計算機可用的CPU數量會被設定為執行緒數量作為預設值。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序演算法。這裡的要點在於,ForkJoinPool需要使用相對少的執行緒來處理大量的任務。比如要對1000萬個資料進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬資料的合併任務。以此類推,對於500萬的資料也會做出同樣的分割處理,到最後會設定一個閾值來規定當資料規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。

那麼到最後,所有的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行。

所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的執行緒無法像任務佇列中再新增一個任務並且在等待該任務完成之後再繼續執行。而使用ForkJoinPool時,就能夠讓其中的執行緒建立新的任務,並掛起當前的任務,此時執行緒就能夠從佇列中選擇子任務執行。

比如,我們需要統計一個double陣列中小於0.5的元素的個數,那麼可以使用ForkJoinPool進行實現如下:

public class ForkJoinTest {
    private double[] d;
    private class ForkJoinTask extends RecursiveTask<Integer> {
        private int first;
        private int last;
        public ForkJoinTask(int first, int last) {
            this.first = first;
            this.last = last;
        }
        protected Integer compute() {
            int subCount;
            if (last - first < 10) {
                subCount = 0;
                for (int i = first; i <= last; i++) {
                    if (d[i] < 0.5)
                        subCount++;
                    }
                }
            else {
                int mid = (first + last) >>> 1;
                ForkJoinTask left = new ForkJoinTask(first, mid);
                left.fork();
                ForkJoinTask right = new ForkJoinTask(mid + 1, last);
                right.fork();
                subCount = left.join();
                subCount += right.join();
            }
            return subCount;
        }
    }
    public static void main(String[] args) {
        d = createArrayOfRandomDoubles();
        int n = new ForkJoinPool().invoke(new ForkJoinTask(0, 9999999));
        System.out.println("Found " + n + " values");
    }
}

以上的關鍵是fork()和join()方法。在ForkJoinPool使用的執行緒中,會使用一個內部佇列來對需要執行的任務以及子任務進行操作來保證它們的執行順序。

那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼效能的差異呢?

首先,使用ForkJoinPool能夠使用數量有限的執行緒來完成非常多的具有父子關係的任務,比如使用4個執行緒來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關係的任務時,也需要200萬個執行緒,顯然這是不可行的。

當然,在上面的例子中,也可以不使用分治法,因為任務之間的獨立性,可以將整個陣列劃分為幾個區域,然後使用ThreadPoolExecutor來解決,這種辦法不會建立數量龐大的子任務。程式碼如下:

public class ThreadPoolTest {
    private double[] d;
    private class ThreadPoolExecutorTask implements Callable<Integer> {
        private int first;
        private int last;
        public ThreadPoolExecutorTask(int first, int last) {
            this.first = first;
            this.last = last;
        }
        public Integer call() {
            int subCount = 0;
            for (int i = first; i <= last; i++) {
                if (d[i] < 0.5) {
                    subCount++;
                }
            }
            return subCount;
        }
    }
    public static void main(String[] args) {
        d = createArrayOfRandomDoubles();
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(4, 4, Long.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue());
        Future[] f = new Future[4];
        int size = d.length / 4;
        for (int i = 0; i < 3; i++) {
            f[i] = tpe.submit(new ThreadPoolExecutorTask(i * size, (i + 1) * size - 1);
        }
        f[3] = tpe.submit(new ThreadPoolExecutorTask(3 * size, d.length - 1);
        int n = 0;
        for (int i = 0; i < 4; i++) {
            n += f.get();
        }
        System.out.println("Found " + n + " values");
    }
}

在分別使用ForkJoinPool和ThreadPoolExecutor時,它們處理這個問題的時間如下:

執行緒數 ForkJoinPool ThreadPoolExecutor
1 3.2s 0.31s
4 1.9s 0.15s

對執行過程中的GC同樣也進行了監控,發現在使用ForkJoinPool時,總的GC時間花去了1.2s,而ThreadPoolExecutor並沒有觸發任何的GC操作。這是因為在ForkJoinPool的執行過程中,會建立大量的子任務。而當他們執行完畢之後,會被垃圾回收。反之,ThreadPoolExecutor則不會建立任何的子任務,因此不會導致任何的GC操作。

ForkJoinPool的另外一個特性是它能夠實現工作竊取(Work Stealing),在該執行緒池的每個執行緒中會維護一個佇列來存放需要被執行的任務。當執行緒自身佇列中的任務都執行完畢後,它會從別的執行緒中拿到未被執行的任務並幫助它執行。

可以通過以下的程式碼來測試ForkJoinPool的Work Stealing特性:

for (int i = first; i <= last; i++) {
    if (d[i] < 0.5) {
        subCount++;
    }
    for (int j = 0; j < d.length - i; j++) {
        for (int k = 0; k < 100; k++) {
            dummy = j * k + i; // dummy is volatile, so multiple writes occur
            d[i] = dummy;
        }
    }
}

因為裡層的迴圈次數(j)是依賴於外層的i的值的,所以這段程式碼的執行時間依賴於i的值。當i = 0時,執行時間最長,而i = last時執行時間最短。也就意味著任務的工作量是不一樣的,當i的值較小時,任務的工作量大,隨著i逐漸增加,任務的工作量變小。因此這是一個典型的任務負載不均衡的場景。

這時,選擇ThreadPoolExecutor就不合適了,因為它其中的執行緒並不會關注每個任務之間任務量的差異。當執行任務量最小的任務的執行緒執行完畢後,它就會處於空閒的狀態(Idle),等待任務量最大的任務執行完畢。

而ForkJoinPool的情況就不同了,即使任務的工作量有差別,當某個執行緒在執行工作量大的任務時,其他的空閒執行緒會幫助它完成剩下的任務。因此,提高了執行緒的利用率,從而提高了整體效能。

這兩種執行緒池對於任務工作量不均衡時的執行時間:

執行緒數 ForkJoinPool ThreadPoolExecutor
1 54.5s 53.3s
4 16.6s 24.2s

注意到當執行緒數量為1時,兩者的執行時間差異並不明顯。這是因為總的計算量是相同的,而ForkJoinPool慢的那一秒多是因為它建立了非常多的任務,同時也導致了GC的工作量增加。

當執行緒數量增加到4時,執行時間的區別就較大了,ForkJoinPool的效能比ThreadPoolExecutor好將近50%,可見Work Stealing在應對任務量不均衡的情況下,能夠保證資源的利用率。

所以一個結論就是:當任務的任務量均衡時,選擇ThreadPoolExecutor往往更好,反之則選擇ForkJoinPool。

另外,對於ForkJoinPool,還有一個因素會影響它的效能,就是停止進行任務分割的那個閾值。比如在之前的快速排序中,當剩下的元素數量小於10的時候,就會停止子任務的建立。下表顯示了在不同閾值下,ForkJoinPool的效能:

執行緒數 ForkJoinPool
20 17.8s
10 16.6s
5 15.6s
1 16.8s

可以發現,當閾值不同時,對於效能也會有一定影響。因此,在使用ForkJoinPool時,對此閾值進行測試,使用一個最合適的值也有助於整體效能。

自動並行化(Automatic Parallelization)

在Java 8中,引入了自動並行化的概念。它能夠讓一部分Java程式碼自動地以並行的方式執行,前提是使用了ForkJoinPool。

Java 8為ForkJoinPool添加了一個通用執行緒池,這個執行緒池用來處理那些沒有被顯式提交到任何執行緒池的任務。它是ForkJoinPool型別上的一個靜態元素,它擁有的預設執行緒數量等於執行計算機上的處理器數量。

當呼叫Arrays類上新增的新方法時,自動並行化就會發生。比如用來排序一個數組的並行快速排序,用來對一個數組中的元素進行並行遍歷。自動並行化也被運用在Java 8新新增的Stream API中。

比如下面的程式碼用來遍歷列表中的元素並執行需要的計算:

Stream<Integer> stream = arrayList.parallelStream();
stream.forEach(a -> {
    String symbol = StockPriceUtils.makeSymbol(a);
    StockPriceHistory sph = new StockPriceHistoryImpl(symbol, startDate, endDate, entityManager);
});

對於列表中的元素的計算都會以並行的方式執行。forEach方法會為每個元素的計算操作建立一個任務,該任務會被前文中提到的ForkJoinPool中的通用執行緒池處理。以上的平行計算邏輯當然也可以使用ThreadPoolExecutor完成,但是就程式碼的可讀性和程式碼量而言,使用ForkJoinPool明顯更勝一籌。

對於ForkJoinPool通用執行緒池的執行緒數量,通常使用預設值就可以了,即執行時計算機的處理器數量。如果需要調整執行緒數量,可以通過設定系統屬性:-Djava.util.concurrent.ForkJoinPool.common.parallelism=N

下面的一組資料用來比較使用ThreadPoolExecutor和ForkJoinPool中的通用執行緒池來完成上面簡單計算時的效能:

執行緒數 ThreadPoolExecutor(秒) ForkJoinPool Common Pool(秒)
1 255.6 135.4
2 134.8 110.2
4 77.0 96.5
8 81.7 84.0
16 85.6 84.6

注意到當執行緒數為1,2,4時,效能差異的比較明顯。執行緒數為1的ForkJoinPool通用執行緒池和執行緒數為2的ThreadPoolExecutor的效能十分接近。

出現這種現象的原因是,forEach方法用了一些小把戲。它會將執行forEach本身的執行緒也作為執行緒池中的一個工作執行緒。因此,即使將ForkJoinPool的通用執行緒池的執行緒數量設定為1,實際上也會有2個工作執行緒。因此在使用forEach的時候,執行緒數為1的ForkJoinPool通用執行緒池和執行緒數為2的ThreadPoolExecutor是等價的。

所以當ForkJoinPool通用執行緒池實際需要4個工作執行緒時,可以將它設定成3,那麼在執行時可用的工作執行緒就是4了。

總結

  1. 當需要處理遞迴分治演算法時,考慮使用ForkJoinPool。
  2. 仔細設定不再進行任務劃分的閾值,這個閾值對效能有影響。
  3. Java 8中的一些特性會使用到ForkJoinPool中的通用執行緒池。在某些場合下,需要調整該執行緒池的預設的執行緒數量。