1. 程式人生 > >線程池的原理及實現

線程池的原理及實現

execute inter void date() 超過 緩沖 線程池大小 exceptio 調整

1、線程池簡介:

多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。

假設一個服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。

如果:T1 + T3 遠大於 T2,則可以采用線程池,以提高服務器性能。

一個線程池包括以下四個基本組成部分:

  1、線程池管理器(ThreadPool):用於創建並管理線程池,包括 創建線程池,銷毀線程池,添加新任務;

  2、工作線程(PoolWorker):線程池中線程,在沒有任務時處於等待狀態,可以循環的執行任務;

  3、任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行,它主要規定了任務的入口,任務執行完後的收尾工作,任務的執行狀態等;

  4、任務隊列(taskQueue):用於存放沒有處理的任務。提供一種緩沖機制。

線程池技術正是關註如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目,看一個例子:

假設一個服務器一天要處理50000個請求,並且每個請求需要一個單獨的線程完成。在線程池中,線程數一般是固定的,所以產生線程總數不會超過線程池中線程的數目,而如果服務器不利用線程池來處理這些請求則線程總數為50000。一般線程池大小是遠小於50000。所以利用線程池的服務器程序不會為了創建50000而在處理請求時浪費時間,從而提高效率。

線程池實現代碼:

  1 package com.wb.thread;
  2 
  3 import java.util.LinkedList;
  4 import java.util.List;
  5 
  6 /**
  7  * 線程池類
  8  * @author wangbo
  9  *
 10  */
 11 public class ThreadPool {
 12     
 13     private static int worker_num = 5;//線程池中線程的個數,默認為5
 14     
 15     private WorkThread[] workthreads;//
工作線程 16 17 private static volatile int finished_task = 0;//未處理的任務 18 19 private List<Runnable> taskQueue = new LinkedList<Runnable>();//任務隊列 20 21 private static ThreadPool threadPool; 22 23 /** 24 * 無參構造器,創建線程池 25 */ 26 private ThreadPool(){ 27 this(5); 28 } 29 30 /** 31 * 含參構造器,創建線程池 32 * @param num 33 */ 34 private ThreadPool(int num){ 35 worker_num = num; 36 workthreads = new WorkThread[num]; 37 for (int i = 0; i < workthreads.length; i++) { 38 workthreads[i] = new WorkThread(); 39 workthreads[i].start();//開啟線程 40 } 41 } 42 43 /** 44 * 獲得一個默認線程個數的線程池 45 * @return 46 */ 47 public static ThreadPool getThreadPool(){ 48 return getThreadPool(ThreadPool.worker_num); 49 } 50 51 /** 52 * 獲得一個指定線程個數的線程池 53 * @param num 54 * @return 55 */ 56 public static ThreadPool getThreadPool(int num) { 57 if (num <= 0) { 58 num = ThreadPool.worker_num; 59 } 60 if (threadPool == null) { 61 threadPool = new ThreadPool(num); 62 } 63 return threadPool; 64 } 65 66 /** 67 * 將任務單個添加到隊列 68 * @param task 69 */ 70 public void execute(Runnable task){ 71 synchronized (taskQueue) { 72 taskQueue.add(task); 73 taskQueue.notify(); 74 } 75 } 76 77 /** 78 * 將任務批量添加到隊列 79 * @param tasks 80 */ 81 public void execute(Runnable[] tasks){ 82 synchronized (taskQueue) { 83 for (Runnable runnable : tasks) { 84 taskQueue.add(runnable); 85 } 86 taskQueue.notify(); 87 } 88 } 89 90 /** 91 * 將任務批量添加到隊列 92 * @param tasks 93 */ 94 public void execute(List<Runnable> tasks){ 95 synchronized (taskQueue) { 96 for (Runnable runnable : tasks) { 97 taskQueue.add(runnable); 98 } 99 taskQueue.notify(); 100 } 101 } 102 103 /** 104 * 銷毀線程池 105 */ 106 public void destroy(){ 107 //還有任務沒有執行完 108 while(!taskQueue.isEmpty()){ 109 try { 110 Thread.sleep(10); 111 } catch (InterruptedException e) { 112 e.printStackTrace(); 113 } 114 } 115 //停止工作線程,且置為null 116 for (int i = 0; i < workthreads.length; i++) { 117 workthreads[i].stopWorker(); 118 workthreads[i] = null; 119 } 120 threadPool = null; 121 taskQueue.clear();//清空隊列 122 } 123 124 /** 125 * 獲取工作線程的個數 126 * @return 127 */ 128 public int getWorkThreadNumber(){ 129 return worker_num; 130 } 131 132 /** 133 * 獲取已完成任務數量 134 * @return 135 */ 136 public int getFinishedTaskNumber(){ 137 return finished_task; 138 } 139 140 /** 141 * 獲取未完成任務數量 142 * @return 143 */ 144 public int getWaitTaskNumber(){ 145 return taskQueue.size(); 146 } 147 148 /** 149 * 獲取線程池信息 150 */ 151 @Override 152 public String toString() { 153 return "工作線程數量:" + getWorkThreadNumber() 154 + ",已完成任務數量" + getFinishedTaskNumber() 155 + ",未完成任務數量" + getWaitTaskNumber(); 156 157 } 158 159 /** 160 * 內部類,工作線程 161 * @author wangbo 162 * 163 */ 164 private class WorkThread extends Thread{ 165 166 private boolean isRunning = true;//線程有效標誌 167 168 @Override 169 public void run() { 170 Runnable runnable = null; 171 while (isRunning) { 172 synchronized (taskQueue) { 173 //隊列為空 174 while (isRunning && taskQueue.isEmpty()) { 175 try { 176 taskQueue.wait(20); 177 } catch (InterruptedException e) { 178 e.printStackTrace(); 179 } 180 } 181 //隊列不為空 182 if (!taskQueue.isEmpty()) { 183 runnable = taskQueue.remove(0);//去除任務 184 } 185 } 186 if (runnable != null) { 187 runnable.run();//執行任務 188 } 189 finished_task++; 190 runnable = null; 191 } 192 193 } 194 195 /** 196 * 停止線程 197 */ 198 public void stopWorker() { 199 isRunning = false; 200 } 201 202 } 203 204 }

測試代碼:

 1 package com.wb.thread;
 2 
 3 public class ThreadPoolTest {
 4 
 5      public static void main(String[] args) {  
 6         // 創建3個線程的線程池  
 7         ThreadPool t = ThreadPool.getThreadPool(3);  
 8         t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
 9         t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
10         System.out.println(t);  
11         t.destroy();//所有線程都執行完成才destory  
12         System.out.println(t);  
13     }
14       
15     // 任務類  
16     static class Task implements Runnable {
17         
18         private static volatile int i = 1;
19         
20         @Override
21         public void run() {// 執行任務
22             System.out.println("任務 " + (i++) + " 完成");
23         }  
24     }  
25 
26 }

2、java類庫中提供的線程池簡介:

java.util.concurrent包提供了現成的線程池的實現。

技術分享技術分享

示例代碼:

 1 package com.wb.thread;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 /**
 6  * newCachedThreadPool()
 7  * 線程池為無限大,當執行第二個任務時第一個任務已經完成,會復用執行第一個任務的線程,而不用每次新建線程。
 8  * 有任務才會創建線程,空閑線程會被保留60s
 9  * @author wangbo
10  *
11  */
12 public class ThreadPoolExecutorTest1 {
13     
14     public static void main(String[] args) {
15         ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
16         for (int i = 0; i < 10; i++) {
17             final int index = i;
18             try {
19                 Thread.sleep(1000);
20             } catch (InterruptedException e) {
21                 e.printStackTrace();
22             }
23             cachedThreadPool.execute(new Runnable() {
24                 @Override
25                 public void run() {
26                     System.out.println(index);
27                     System.out.println(Thread.currentThread().getName());
28                 }
29             });
30         }
31     }
32 
33 }
 1 package com.wb.thread;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 /**
 6  * newFixedThreadPool(int nThreads)
 7  * 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
 8  * 線程池中包含固定數目的線程,空閑線程會一直保留,參數nThreads表示設定線程池中線程的數目
 9  * @author wangbo
10  *
11  */
12 public class ThreadPoolExecutorTest2 {
13     
14     public static void main(String[] args) {
15         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
16         for (int i = 0; i < 10; i++) {
17             final int index = i;
18             fixedThreadPool.execute(new Runnable() {
19                 @Override
20                 public void run() {
21                     try {
22                         System.out.println(index);
23                         System.out.println(Thread.currentThread().getName());
24                         Thread.sleep(2000);
25                     } catch (InterruptedException e) {
26                         e.printStackTrace();
27                     }
28                 }
29             });
30         }
31     }
32 
33 }
 1 package com.wb.thread;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.ScheduledExecutorService;
 7 import java.util.concurrent.TimeUnit;
 8 /**
 9  * newScheduledThreadPool(int corePoolSize)
10  * 線程池能按時間計劃來執行任務,允許用戶設定計劃執行任務的時間。
11  * 參數corePoolSize設定線程池中線程最小數目,當任務較多時,線程池可能會創建更多的工作線程來執行任務。
12  * @author wangbo
13  *
14  */
15 public class ThreadPoolExecutorTest3 {
16     
17     public static void main(String[] args) {
18         
19         method1();
20         method2();
21         
22     }
23     
24     /**
25      * 延遲3s執行
26      */
27     private static void method1(){
28         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
29         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
30         scheduledThreadPool.schedule(new Runnable() {
31             public void run() {
32                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
33                 System.out.println("延遲2s執行");
34             }
35         }, 2, TimeUnit.SECONDS);
36     }
37     
38     /**
39      * 延遲2s執行後每3s執行一次
40      */
41     private static void method2() {
42         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
43         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
44         scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
45             public void run() {
46                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
47                 System.out.println("延遲2s執行後每3s執行一次");
48                 System.out.println(Thread.currentThread().getName());
49             }
50         }, 2, 3, TimeUnit.SECONDS);
51     }
52 
53 }
 1 package com.wb.thread;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 /**
 6  * newSingleThreadExecutor(int nThreads)
 7  * 線程池中只有一個線程,它依次執行每個任務。
 8  * 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
 9  * @author wangbo
10  *
11  */
12 public class ThreadPoolExecutorTest4 {
13     
14     public static void main(String[] args) {
15         ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
16         for (int i = 0; i < 10; i++) {
17             final int index = i;
18             singleThreadPool.execute(new Runnable() {
19                 @Override
20                 public void run() {
21                     try {
22                         System.out.println(index);
23                         System.out.println(Thread.currentThread().getName());
24                         Thread.sleep(2000);
25                     } catch (InterruptedException e) {
26                         e.printStackTrace();
27                     }
28                 }
29             });
30         }
31     }
32 
33 }
 1 package com.wb.thread;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.ScheduledExecutorService;
 7 import java.util.concurrent.TimeUnit;
 8 /**
 9  * newSingleThreadScheduledExecutor()
10  * 線程池中只有一個線程,它能按照時間計劃執行每個任務。
11  * @author wangbo
12  *
13  */
14 public class ThreadPoolExecutorTest5 {
15     
16     public static void main(String[] args) {
17         
18         method1();
19         method2();
20         
21     }
22     
23     /**
24      * 延遲3s執行
25      */
26     private static void method1(){
27         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
28         ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
29         scheduledThreadPool.schedule(new Runnable() {
30             public void run() {
31                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
32                 System.out.println("延遲2s執行");
33             }
34         }, 2, TimeUnit.SECONDS);
35     }
36     
37     /**
38      * 延遲2s執行後每3s執行一次
39      */
40     private static void method2() {
41         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
42         ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
43         scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
44             public void run() {
45                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
46                 System.out.println("延遲2s執行後每3s執行一次");
47                 System.out.println(Thread.currentThread().getName());
48             }
49         }, 2, 3, TimeUnit.SECONDS);
50     }
51 
52 }

線程池的原理及實現