1. 程式人生 > >Java執行緒池原理及使用

Java執行緒池原理及使用

java中的執行緒池是運用場景最多的併發框架。在開發過程中,合理的使用執行緒池能夠帶來下面的一些好處: 1、降低資源的消耗。 2、提高響應速度。 3、提高執行緒的可管理型。

1.1、執行緒池ThreadPoolExecutor工作原理

講解之前,我們先看一張原理圖 20160221172500424.png

ThreadPoolExecutor執行execute方法有4種情況: 1)如果當前執行的執行緒少於corePoolSize,則建立新的執行緒來執行任務。 2)如果執行的執行緒等於或者多餘corePoolSize,則將任務加入BlockingQueue中,在等待佇列中,等待有新的執行緒可以執行。 3)如果BlockingQueue佇列滿了,且沒有超過maxPoolSize,則建立新的執行緒來處理任務。 4)如果建立的執行緒超過maxPoolSize,任務會拒絕,並呼叫RejectExecutionHandler.rejectedExecution()方法。

1.2、執行緒池的使用

1.2.1、執行緒池的建立

一般我們可以通過ThreadPoolExecutor來建立一個執行緒池。 在ThreadPoolExecutor類中提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

我們通過

new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);

建立一個新的執行緒池。

下面我們介紹一下需要輸入的幾個引數的意義:

1)corePoolSize:核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;

2) maximumPoolSize:執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒;

3)keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0;

  • unit:引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒</pre>

4) workQueue:一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,這裡的阻塞佇列有以下幾種選擇:

  • ArrayBlockingQueue:一個基於陣列結構的有界阻塞佇列。
  • LinkedBlockingQueue:一個基於連結串列的阻塞佇列,吞吐量要高於ArrayBlockingQueue。
  • SynchronousQueue:一個不儲存元素的阻塞佇列。每次插入操作必須等到另外一個執行緒呼叫移除操作,否則一直處於阻塞狀態。吞吐量要高於LinkedBlockingQueue。
  • PriorityBlockingQueue:一個具有優先順序的無線阻塞佇列。

ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。

5)threadFactory:執行緒工廠,主要用來建立執行緒; 6)RejectedExecutionHandler:當佇列和執行緒池都滿了,將會執行下面的策略,jdk1.5中提供有以下四種策略:

  • ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
  • ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
  • ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

1.2.2、如何向執行緒池提交任務

向執行緒池提交任務,提供了兩種方法,分別是execute()和submit()方法。

1)execute()方法

execute方法用於提交不需要返回值的任務,所以也就意味著無法判斷是否執行成功。

pool.execute(new Runnable(){

			@Override
			public void run() {
				System.out.println("使用execute提交任務.");
			}
			
});

2)submit方法

submit方法可以用於提交需要有返回值的任務。執行緒池會返回一個future型別的物件,通過這個future物件可以判讀是否執行成功,並且還可以通過get()方法來獲取返回值。

Runnable task = null;
		Future<Object> future = (Future<Object>) pool.submit(task);
        try {
			future.get();//獲取返回值
		} catch (InterruptedException e) {//中斷異常處理
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			//關閉執行緒池
			pool.shutdown();
		}

1.2.3、關閉執行緒池

在上一節中,我們在異常的處理後面,我們就使用到了shutdown()方法來關閉執行緒池。

在關閉執行緒池的時候,這裡有兩個方法可以呼叫,分別是shutdownshutdownNow方法。

1.3、執行緒池使用例項

1.3.1、執行緒池的使用例項

這個例項我們使用自定義的拒絕策略,因為jdk的策略並不是很完美

public class MyRejected implements RejectedExecutionHandler{

	
	public MyRejected(){
	}
	
	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		System.out.println("自定義處理..");
		System.out.println("當前被拒絕任務為:" + r.toString());
		

	}

}

然後我們定義一個任務類

public class MyTask implements Runnable {

	private int taskId;
	private String taskName;
	
	public MyTask(int taskId, String taskName){
		this.taskId = taskId;
		this.taskName = taskName;
	}
	
	public int getTaskId() {
		return taskId;
	}

	public void setTaskId(int taskId) {
		this.taskId = taskId;
	}

	public String getTaskName() {
		return taskName;
	}

	public void setTaskName(String taskName) {
		this.taskName = taskName;
	}

	@Override
	public void run() {
		try {
			System.out.println("run taskId =" + this.taskId);
			Thread.sleep(5*1000);
			//System.out.println("end taskId =" + this.taskId);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}		
	}
	
	public String toString(){
		return Integer.toString(this.taskId);
	}

}

最後,我們看一下任務執行

public class UseThreadPoolExecutor1 {


	public static void main(String[] args) {
		/**
		 * 在使用有界佇列時,若有新的任務需要執行,如果執行緒池實際執行緒數小於corePoolSize,則優先建立執行緒,
		 * 若大於corePoolSize,則會將任務加入佇列,
		 * 若佇列已滿,則在匯流排程數不大於maximumPoolSize的前提下,建立新的執行緒,
		 * 若執行緒數大於maximumPoolSize,則執行拒絕策略。或其他自定義方式。
		 * 
		 */	
		ThreadPoolExecutor pool = new ThreadPoolExecutor(
				1, 				//coreSize
				2, 				//MaxSize
				60, 			//60
				TimeUnit.SECONDS, 
				new ArrayBlockingQueue<Runnable>(3)			//指定一種佇列 (有界佇列)
				//new LinkedBlockingQueue<Runnable>()
				, new MyRejected()
				//, new DiscardOldestPolicy()
				);
		
		MyTask mt1 = new MyTask(1, "任務1");
		MyTask mt2 = new MyTask(2, "任務2");
		MyTask mt3 = new MyTask(3, "任務3");
		MyTask mt4 = new MyTask(4, "任務4");
		MyTask mt5 = new MyTask(5, "任務5");
		MyTask mt6 = new MyTask(6, "任務6");
		
		pool.execute(mt1);
		pool.execute(mt2);
		pool.execute(mt3);
		
		
		
		/*pool.execute(mt4);
		pool.execute(mt5);
		pool.execute(mt6);*/
		
		pool.shutdown();
//		pool.shutdownNow();
		
	}
}

執行結果: 1)當執行<5個時,可以正常執行: 搜狗截圖20180927165203.jpg 2)當>5時,因為大於了最大值,所以執行了異常策略: 搜狗截圖20180927165223.jpg

1.3.2、執行緒池的監控引數或者其他api使用

當我們需要對執行緒池進行監控時,我們可以使用執行緒池提供的引數進行監控,可以使用下面的一些屬性。

  • taskCount:執行緒池需要執行的任務數量。
  • completedTaskCount:執行緒池在執行過程中已完成的數量。
  • largestPoolSize:執行緒池裡曾經建立過的最大的執行緒數量。
  • poolSize:執行緒池的執行緒數量。
  • ActiveCount:獲取活動的執行緒數量。
System.out.println(pool.getTaskCount());
System.out.println(pool.getCompletedTaskCount());
System.out.println(pool.getLargestPoolSize());
System.out.println(pool.getPoolSize());
System.out.println(pool.getActiveCount());

執行結果: 搜狗截圖20180927170209.jpg

1.4、如何合理的配置執行緒池的大小

一般需要根據任務的型別來配置執行緒池大小:

  • 如果是CPU密集型任務,就需要儘量壓榨CPU,參考值可以設為 NCPU+1

  • 如果是IO密集型任務,參考值可以設定為2*NCPU

  • 建議使用有界佇列。因為有界佇列能夠增加系統的穩定性和預警的能力,我們可以想象一下,當我們使用無界佇列的時候,當我們系統的後臺的執行緒池的佇列和執行緒池會越來越多,這樣當達到一定的程度的時候,有可能會撐滿記憶體,導致系統出現問題。當我們是有界佇列的時候,當我們系統的後臺的執行緒池的佇列和執行緒池滿了之後,會不斷的丟擲異常的任務,我們可以通過異常資訊做一些事情。

當然,這只是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先將執行緒池大小設定為參考值,再觀察任務執行情況和系統負載、資源利用率來進行適當調整。