1. 程式人生 > >如何合理設定執行緒池大小

如何合理設定執行緒池大小

要想合理的配置執行緒池的大小,首先得分析任務的特性,可以從以下幾個角度分析:

  1. 任務的性質:CPU密集型任務、IO密集型任務、混合型任務。
  2. 任務的優先順序:高、中、低。
  3. 任務的執行時間:長、中、短。
  4. 任務的依賴性:是否依賴其他系統資源,如資料庫連線等。

性質不同的任務可以交給不同規模的執行緒池執行。

對於不同性質的任務來說,CPU密集型任務應配置儘可能小的執行緒,如配置CPU個數+1的執行緒數,IO密集型任務應配置儘可能多的執行緒,因為IO操作不佔用CPU,不要讓CPU閒下來,應加大執行緒數量,如配置兩倍CPU個數+1,而對於混合型的任務,如果可以拆分,拆分成IO密集型和CPU密集型分別處理,前提是兩者執行的時間是差不多的,如果處理時間相差很大,則沒必要拆分了。

若任務對其他系統資源有依賴,如某個任務依賴資料庫的連線返回的結果,這時候等待的時間越長,則CPU空閒的時間越長,那麼執行緒數量應設定得越大,才能更好的利用CPU。 當然具體合理執行緒池值大小,需要結合系統實際情況,在大量的嘗試下比較才能得出,以上只是前人總結的規律。

最佳執行緒數目 = ((執行緒等待時間+執行緒CPU時間)/執行緒CPU時間 )* CPU數目

比如平均每個執行緒CPU執行時間為0.5s,而執行緒等待時間(非CPU執行時間,比如IO)為1.5s,CPU核心數為8,那麼根據上面這個公式估算得到:((0.5+1.5)/0.5)*8=32。這個公式進一步轉化為:

最佳執行緒數目 = (執行緒等待時間與執行緒CPU時間之比 + 1)* CPU數目

可以得出一個結論:執行緒等待時間所佔比例越高,需要越多執行緒。執行緒CPU時間所佔比例越高,需要越少執行緒。 以上公式與之前的CPU和IO密集型任務設定執行緒數基本吻合。

併發程式設計網上的一個問題 高併發、任務執行時間短的業務怎樣使用執行緒池?併發不高、任務執行時間長的業務怎樣使用執行緒池?併發高、業務執行時間長的業務怎樣使用執行緒池? (1)高併發、任務執行時間短的業務,執行緒池執行緒數可以設定為CPU核數+1,減少執行緒上下文的切換 (2)併發不高、任務執行時間長的業務要區分開看:   a)假如是業務時間長集中在IO操作上,也就是IO密集型的任務,因為IO操作並不佔用CPU,所以不要讓所有的CPU閒下來,可以適當加大執行緒池中的執行緒數目,讓CPU處理更多的業務   b)假如是業務時間長集中在計算操作上,也就是計算密集型任務,這個就沒辦法了,和(1)一樣吧,執行緒池中的執行緒數設定得少一些,減少執行緒上下文的切換 (3)併發高、業務執行時間長,解決這種型別任務的關鍵不在於執行緒池而在於整體架構的設計,看看這些業務裡面某些資料是否能做快取是第一步,增加伺服器是第二步,至於執行緒池的設定,設定參考(2)。最後,業務執行時間長的問題,也可能需要分析一下,看看能不能使用中介軟體對任務進行拆分和解耦。

如何合理地估算執行緒池大小?

這個問題雖然看起來很小,卻並不那麼容易回答。大家如果有更好的方法歡迎賜教,先來一個天真的估算方法:假設要求一個系統的TPS(Transaction Per Second或者Task Per Second)至少為20,然後假設每個Transaction由一個執行緒完成,繼續假設平均每個執行緒處理一個Transaction的時間為4s。那麼問題轉化為:

如何設計執行緒池大小,使得可以在1s內處理完20個Transaction?

計算過程很簡單,每個執行緒的處理能力為0.25TPS,那麼要達到20TPS,顯然需要20/0.25=80個執行緒。

很顯然這個估算方法很天真,因為它沒有考慮到CPU數目。一般伺服器的CPU核數為16或者32,如果有80個執行緒,那麼肯定會帶來太多不必要的執行緒上下文切換開銷。 再來第二種簡單的但不知是否可行的方法(N為CPU總核數):

  • 如果是CPU密集型應用,則執行緒池大小設定為N+1
  • 如果是IO密集型應用,則執行緒池大小設定為2N+1

如果一臺伺服器上只部署這一個應用並且只有這一個執行緒池,那麼這種估算或許合理,具體還需自行測試驗證。

接下來在這個文件:伺服器效能IO優化 中發現一個估算公式:

1 最佳執行緒數目 = ((執行緒等待時間+執行緒CPU時間)/執行緒CPU時間 )* CPU數目

比如平均每個執行緒CPU執行時間為0.5s,而執行緒等待時間(非CPU執行時間,比如IO)為1.5s,CPU核心數為8,那麼根據上面這個公式估算得到:((0.5+1.5)/0.5)*8=32。這個公式進一步轉化為:

1 最佳執行緒數目 = (執行緒等待時間與執行緒CPU時間之比 + 1)* CPU數目

可以得出一個結論:

執行緒等待時間所佔比例越高,需要越多執行緒。執行緒CPU時間所佔比例越高,需要越少執行緒。

上一種估算方法也和這個結論相合。

一個系統最快的部分是CPU,所以決定一個系統吞吐量上限的是CPU。增強CPU處理能力,可以提高系統吞吐量上限。但根據短板效應,真實的系統吞吐量並不能單純根據CPU來計算。那要提高系統吞吐量,就需要從“系統短板”(比如網路延遲、IO)著手:

  • 儘量提高短板操作的並行化比率,比如多執行緒下載技術
  • 增強短板能力,比如用NIO替代IO

第一條可以聯絡到Amdahl定律,這條定律定義了序列系統並行化後的加速比計算公式:

1 加速比=優化前系統耗時 / 優化後系統耗時

加速比越大,表明系統並行化的優化效果越好。Addahl定律還給出了系統並行度、CPU數目和加速比的關係,加速比為Speedup,系統序列化比率(指序列執行程式碼所佔比率)為F,CPU數目為N:

1 Speedup <= 1 / (F + (1-F)/N)

當N足夠大時,序列化比率F越小,加速比Speedup越大。

寫到這裡,我突然冒出一個問題。

是否使用執行緒池就一定比使用單執行緒高效呢?

答案是否定的,比如Redis就是單執行緒的,但它卻非常高效,基本操作都能達到十萬量級/s。從執行緒這個角度來看,部分原因在於:

  • 多執行緒帶來執行緒上下文切換開銷,單執行緒就沒有這種開銷

當然“Redis很快”更本質的原因在於:Redis基本都是記憶體操作,這種情況下單執行緒可以很高效地利用CPU。而多執行緒適用場景一般是:存在相當比例的IO和網路操作。

所以即使有上面的簡單估算方法,也許看似合理,但實際上也未必合理,都需要結合系統真實情況(比如是IO密集型或者是CPU密集型或者是純記憶體操作)和硬體環境(CPU、記憶體、硬碟讀寫速度、網路狀況等)來不斷嘗試達到一個符合實際的合理估算值。

最後來一個“Dark Magic”估算方法(因為我暫時還沒有搞懂它的原理),使用下面的類:

001 package pool_size_calculate;
002
003 import java.math.BigDecimal;
004 import java.math.RoundingMode;
005 import java.util.Timer;
006 import java.util.TimerTask;
007 import java.util.concurrent.BlockingQueue;
008
009 /**
010 * A class that calculates the optimal thread pool boundaries. It takes the
011 * desired target utilization and the desired work queue memory consumption as
012 * input and retuns thread count and work queue capacity.
013 *
014 * @author Niklas Schlimm
015 *
016 */
017 public abstract class PoolSizeCalculator {
018
019 /**
020 * The sample queue size to calculate the size of a single {@link Runnable}
021 * element.
022 */
023 private final int SAMPLE_QUEUE_SIZE = 1000;
024
025 /**
026 * Accuracy of test run. It must finish within 20ms of the testTime
027 * otherwise we retry the test. This could be configurable.
028 */
029 private final int EPSYLON = 20;
030
031 /**
032 * Control variable for the CPU time investigation.
033 */
034 private volatile boolean expired;
035
036 /**
037 * Time (millis) of the test run in the CPU time calculation.
038 */
039 private final long testtime = 3000;
040
041 /**
042 * Calculates the boundaries of a thread pool for a given {@link Runnable}.
043 *
044 * @param targetUtilization
045 *            the desired utilization of the CPUs (0 <= targetUtilization <=   *            1)     * @param targetQueueSizeBytes   *            the desired maximum work queue size of the thread pool (bytes)     */     protected void calculateBoundaries(BigDecimal targetUtilization,            BigDecimal targetQueueSizeBytes) {      calculateOptimalCapacity(targetQueueSizeBytes);         Runnable task = creatTask();        start(task);        start(task); // warm up phase       long cputime = getCurrentThreadCPUTime();       start(task); // test intervall      cputime = getCurrentThreadCPUTime() - cputime;      long waittime = (testtime * 1000000) - cputime;         calculateOptimalThreadCount(cputime, waittime, targetUtilization);  }   private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) {        long mem = calculateMemoryUsage();      BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(              mem), RoundingMode.HALF_UP);        System.out.println("Target queue memory usage (bytes): "                + targetQueueSizeBytes);        System.out.println("createTask() produced "                 + creatTask().getClass().getName() + " which took " + mem               + " bytes in a queue");         System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem);       System.out.println("* Recommended queue capacity (bytes): "                 + queueCapacity);   }   /**      * Brian Goetz' optimal thread count formula, see 'Java Concurrency in   * Practice' (chapter 8.2)   *       * @param cpu    *            cpu time consumed by considered task   * @param wait   *            wait time of considered task   * @param targetUtilization      *            target utilization of the system   */     private void calculateOptimalThreadCount(long cpu, long wait,           BigDecimal targetUtilization) {         BigDecimal waitTime = new BigDecimal(wait);         BigDecimal computeTime = new BigDecimal(cpu);       BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime()                .availableProcessors());        BigDecimal optimalthreadcount = numberOfCPU.multiply(targetUtilization)                 .multiply(                      new BigDecimal(1).add(waitTime.divide(computeTime,                              RoundingMode.HALF_UP)));        System.out.println("Number of CPU: " + numberOfCPU);        System.out.println("Target utilization: " + targetUtilization);         System.out.println("Elapsed time (nanos): " + (testtime * 1000000));        System.out.println("Compute time (nanos): " + cpu);         System.out.println("Wait time (nanos): " + wait);       System.out.println("Formula: " + numberOfCPU + " * "                + targetUtilization + " * (1 + " + waitTime + " / "                 + computeTime + ")");       System.out.println("* Optimal thread count: " + optimalthreadcount);    }   /**      * Runs the {@link Runnable} over a period defined in {@link #testtime}.     * Based on Heinz Kabbutz' ideas     * (http://www.javaspecialists.eu/archive/Issue124.html).    *       * @param task   *            the runnable under investigation   */     public void start(Runnable task) {      long start = 0;         int runs = 0;       do {            if (++runs > 5) {
046 throw new IllegalStateException("Test not accurate");
047 }
048 expired = false;
049 start = System.currentTimeMillis();
050 Timer timer = new Timer();
051 timer.schedule(new TimerTask() {
052 public void run() {
053 expired = true;
054 }
055 }, testtime);
056 while (!expired) {
057 task.run();
058 }
059 start = System.currentTimeMillis() - start;
060 timer.cancel();
061 } while (Math.abs(start - testtime) > EPSYLON);
062 collectGarbage(3);
063 }
064
065 private void collectGarbage(int times) {
066 for (int i = 0; i < times; i++) {
067 System.gc();
068 try {
069 Thread.sleep(10);
070 } catch (InterruptedException e) {
071 Thread.currentThread().interrupt();
072 break;
073 }
074 }
075 }
076
077 /**
078 * Calculates the memory usage of a single element in a work queue. Based on
079 * Heinz Kabbutz' ideas
081 *
082 * @return memory usage of a single {@link Runnable} element in the thread
083 *         pools work queue
084 */
085 public long calculateMemoryUsage() {
086 BlockingQueue queue = createWorkQueue();
087 for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
088 queue.add(creatTask());
089 }
090 long mem0 = Runtime.getRuntime().totalMemory()
091 - Runtime.getRuntime().freeMemory();
092 long mem1 = Runtime.getRuntime().totalMemory()
093 - Runtime.getRuntime().freeMemory();
094 queue = null;
095 collectGarbage(15);
096 mem0 = Runtime.getRuntime().totalMemory()
097 - Runtime.getRuntime().freeMemory();
098 queue = createWorkQueue();
099 for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
100 queue.add(creatTask());
101 }
102 collectGarbage(15);
103 mem1 = Runtime.getRuntime().totalMemory()
104 - Runtime.getRuntime().freeMemory();
105 return (mem1 - mem0) / SAMPLE_QUEUE_SIZE;
106 }
107
108 /**
109 * Create your runnable task here.
110 *
111 * @return an instance of your runnable task under investigation
112 */
113 protected abstract Runnable creatTask();
114
115 /**
116 * Return an instance of the queue used in the thread pool.
117 *
118 * @return queue instance
119 */
120 protected abstract BlockingQueue createWorkQueue();
121
122 /**
123 * Calculate current cpu time. Various frameworks may be used here,
124 * depending on the operating system in use. (e.g.
126 * measurement, the more accurate the results for thread count boundaries.
127 *
128 * @return current cpu time of current thread
129 */
130 protected abstract long getCurrentThreadCPUTime();
131
132 }

然後自己繼承這個抽象類並實現它的三個抽象方法,比如下面是我寫的一個示例(任務是請求網路資料),其中我指定期望CPU利用率為1.0(即100%),任務佇列總大小不超過100,000位元組:

01 package pool_size_calculate;
02
03 import java.io.BufferedReader;
04 import java.io.IOException;
05 import java.io.InputStreamReader;
06 import java.lang.management.ManagementFactory;
07 import java.math.BigDecimal;
08 import java.net.HttpURLConnection;
09 import java.net.URL;
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingQueue;
12
13 public class SimplePoolSizeCaculatorImpl extends PoolSizeCalculator {
14
15 @Override
16 protected Runnable creatTask() {
17 return new AsyncIOTask();
18 }
19
20 @Override
21 protected BlockingQueue createWorkQueue() {
22 return new LinkedBlockingQueue(1000);
23 }
24
25 @Override
26 protected long getCurrentThreadCPUTime() {
27 return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
28 }
29
30 public static void main(String[] args) {
31 PoolSizeCalculator poolSizeCalculator = new SimplePoolSizeCaculatorImpl();
32 poolSizeCalculator.calculateBoundaries(new BigDecimal(1.0), new BigDecimal(100000));
33 }
34
35 }
36
37 /**
38 * 自定義的非同步IO任務
39 * @author Will
40 *
41 */
42 class AsyncIOTask implements Runnable {
43
44 @Override
45 public void run() {
46 HttpURLConnection connection = null;
47 BufferedReader reader = null;
48 try {
50 URL getUrl = new URL(getURL);
51
52 connection = (HttpURLConnection) getUrl.openConnection();
53 connection.connect();
54 reader = new BufferedReader(new InputStreamReader(
55 connection.getInputStream()));
56
57 String line;
58 while ((line = reader.readLine()) != null) {
59 // empty loop
60 }
61 }
62
63 catch (IOException e) {
64
65 } finally {
66 if(reader != null) {
67 try {
68 reader.close();
69 }
70 catch(Exception e) {
71
72 }
73 }
74 connection.disconnect();
75 }
76
77 }
78
79 }

得到的輸出如下:

01 Target queue memory usage (bytes): 100000
02 createTask() produced pool_size_calculate.AsyncIOTask which took 40 bytes in a queue
03 Formula: 100000 / 40
04 * Recommended queue capacity (bytes): 2500
05 Number of CPU: 4
06 Target utilization: 1
07 Elapsed time (nanos): 3000000000
08 Compute time (nanos): 47181000
09 Wait time (nanos): 2952819000
10 Formula: 4 * 1 * (1 + 2952819000 / 47181000)
11 * Optimal thread count: 256

推薦的任務佇列大小為2500,執行緒數為256,有點出乎意料之外。我可以如下構造一個執行緒池:

1 ThreadPoolExecutor pool =
2 new ThreadPoolExecutor(256, 256, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2500));