1. 程式人生 > >【JVM第九篇】:Executor框架與執行緒池

【JVM第九篇】:Executor框架與執行緒池

Executor框架簡介

在Java 5之後,併發程式設計引入了一堆新的啟動、排程和管理執行緒的API。Executor框架便是Java 5中引入的,其內部使用了執行緒池機制,它在java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行和關閉,可以簡化併發程式設計的操作。因此,在Java 5之後,通過Executor來啟動執行緒比使用Thread的start方法更好,除了更易管理,效率更好(用執行緒池實現,節約開銷)外,還有關鍵的一點:有助於避免this逃逸問題——如果我們在構造器中啟動一個執行緒,因為另一個任務可能會在構造器結束之前開始執行,此時可能會訪問到初始化了一半的物件用Executor在構造器中。

Executor框架包括:執行緒池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

Executor介面中之定義了一個方法execute(Runnable command),該方法接收一個Runable例項,它用來執行一個任務,任務即一個實現了Runnable介面的類。ExecutorService介面繼承自Executor介面,它提供了更豐富的實現多執行緒的方法,比如,ExecutorService提供了關閉自己的方法,以及可為跟蹤一個或多個非同步任務執行狀況而生成 Future 的方法。 可以呼叫ExecutorService的shutdown()方法來平滑地關閉 ExecutorService,呼叫該方法後,將導致ExecutorService停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢後將會關閉ExecutorService。因此我們一般用該介面來實現和管理多執行緒。

ExecutorService的生命週期包括三種狀態:執行、關閉、終止。建立後便進入執行狀態,當呼叫了shutdown()方法時,便進入關閉狀態,此時意味著ExecutorService不再接受新的任務,但它還在執行已經提交了的任務,當素有已經提交了的任務執行完後,便到達終止狀態。如果不呼叫shutdown()方法,ExecutorService會一直處在執行狀態,不斷接收新的任務,執行新的任務,伺服器端一般不需要關閉它,保持一直執行即可。

Executors提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了ExecutorService介面。

  1. 建立固定數目執行緒的執行緒池。
public static ExecutorService newFixedThreadPool(int nThreads)
  1. 建立一個可快取的執行緒池,呼叫execute將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新線 程並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。
public static ExecutorService newCachedThreadPool()
  1. 建立一個單執行緒化的Executor。
public static ExecutorService newSingleThreadExecutor()
  1. 建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

這四種方法都是用的Executors中的ThreadFactory建立的執行緒,下面就以上四個方法做個比較:
newCachedThreadPool()

  1. 快取型池子,先檢視池中有沒有以前建立的執行緒,如果有,就 reuse.如果沒有,就建一個新的執行緒加入池中
  2. 快取型池子通常用於執行一些生存期很短的非同步型任務
    因此在一些面向連線的daemon型SERVER中用得不多。但對於生存期短的非同步任務,它是Executor的首選。
  3. 能reuse的執行緒,必須是timeout IDLE內的池中執行緒,預設 timeout是60s,超過這個IDLE時長,執行緒例項將被終止及移出池。
    注意,放入CachedThreadPool的執行緒不必擔心其結束,超過TIMEOUT不活動,其會自動被終止。

newFixedThreadPool(int)

  1. newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的執行緒
  2. 其獨特之處:任意時間點,最多隻能有固定數目的活動執行緒存在,此時如果有新的執行緒要建立,只能放在另外的佇列中等待,直到當前的執行緒中某個執行緒終止直接被移出池子
  3. 和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文件沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很穩定很固定的正規併發執行緒,多用於伺服器
  4. 從方法的原始碼看,cache池和fixed 池呼叫的是同一個底層 池,只不過引數不同:
    fixed池執行緒數固定,並且是0秒IDLE(無IDLE)
    cache池執行緒數支援0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE。

newFixedThreadPool(int)

  1. newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的執行緒
  2. 其獨特之處:任意時間點,最多隻能有固定數目的活動執行緒存在,此時如果有新的執行緒要建立,只能放在另外的佇列中等待,直到當前的執行緒中某個執行緒終止直接被移出池子
  3. 和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文件沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很穩定很固定的正規併發執行緒,多用於伺服器
  4. 從方法的原始碼看,cache池和fixed 池呼叫的是同一個底層 池,只不過引數不同:
    fixed池執行緒數固定,並且是0秒IDLE(無IDLE)
    cache池執行緒數支援0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE。

newScheduledThreadPool(int)

  1. 排程型執行緒池
  2. 這個池子裡的執行緒可以按schedule依次delay執行,或週期執行

SingleThreadExecutor()

  1. 單例執行緒,任意時間池中只能有一個執行緒
  2. 用的是和cache池和fixed池相同的底層池,但執行緒數目是1-1,0秒IDLE(無IDLE)

一般來說,CachedTheadPool在程式執行過程中通常會建立與所需數量相同的執行緒,然後在它回收舊執行緒時停止建立新執行緒,因此它是合理的Executor的首選,只有當這種方式會引發問題時(比如需要大量長時間面向連線的執行緒時),才需要考慮用FixedThreadPool。(該段話摘自《Thinking in Java》第四版)

Executor執行Runnable任務

通過Executors的以上四個靜態工廠方法獲得 ExecutorService例項,而後呼叫該例項的execute(Runnable command)方法即可。一旦Runnable任務傳遞到execute()方法,該方法便會自動在一個執行緒上執行。下面是是Executor執行Runnable任務的示例程式碼:

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
 
public class TestCachedThreadPool{ 
	public static void main(String[] args){ 
        ExecutorService executorService = Executors.newCachedThreadPool(); 
         for (int i = 0; i < 5; i++){ 
			executorService.execute(new TestRunnable()); 
			System.out.println("************* a" + i + " *************"); 
		} 
        executorService.shutdown(); 
	} 
} 
 
class TestRunnable implements Runnable{ 
	public void run(){ 
		System.out.println(Thread.currentThread().getName() + "執行緒被呼叫了。"); 
    } 
}

某次執行後的結果如下:
在這裡插入圖片描述

從結果中可以看出,pool-1-thread-1和pool-1-thread-2均被呼叫了兩次,這是隨機的,execute會首先線上程池中選擇一個已有空閒執行緒來執行任務,如果執行緒池中沒有空閒執行緒,它便會建立一個新的執行緒來執行任務。

Executor執行Callable任務

在Java 5之後,任務分兩類:一類是實現了Runnable介面的類,一類是實現了Callable介面的類。兩者都可以被ExecutorService執行,但是Runnable任務沒有返回值,而Callable任務有返回值。並且Callable的call()方法只能通過ExecutorService的submit(Callable task) 方法來執行,並且返回一個 Future,是表示任務等待完成的 Future。

Callable介面類似於Runnable,兩者都是為那些其例項可能被另一個執行緒執行的類設計的。但是 Runnable 不會返回結果,並且無法丟擲經過檢查的異常而Callable又返回結果,而且當獲取返回結果時可能會丟擲異常。Callable中的call()方法類似Runnable的run()方法,區別同樣是有返回值,後者沒有。

當將一個Callable的物件傳遞給ExecutorService的submit方法,則該call方法自動在一個執行緒上執行,並且會返回執行結果Future物件。同樣,將Runnable的物件傳遞給ExecutorService的submit方法,則該run方法自動在一個執行緒上執行,並且會返回執行結果Future物件,但是在該Future物件上呼叫get方法,將返回null。

下面給出一個Executor執行Callable任務的示例程式碼:

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.*; 
 
public class CallableDemo{ 
	public static void main(String[] args){ 
		ExecutorService executorService = Executors.newCachedThreadPool(); 
		List<Future<String>> resultList = new ArrayList<Future<String>>(); 
 
		//建立10個任務並執行 
		for (int i = 0; i < 10; i++){ 
			//使用ExecutorService執行Callable型別的任務,並將結果儲存在future變數中 
			Future<String> future = executorService.submit(new TaskWithResult(i)); 
			//將任務執行結果儲存到List中 
			resultList.add(future); 
		} 
 
		//遍歷任務的結果 
		for (Future<String> fs : resultList){ 
				try{ 
					while(!fs.isDone);//Future返回如果沒有完成,則一直迴圈等待,直到Future返回完成
					System.out.println(fs.get());     //列印各個執行緒(任務)執行的結果 
				}catch(InterruptedException e){ 
					e.printStackTrace(); 
				}catch(ExecutionException e){ 
					e.printStackTrace(); 
				}finally{ 
					//啟動一次順序關閉,執行以前提交的任務,但不接受新任務
					executorService.shutdown(); 
				} 
		} 
	} 
} 
 
 
class TaskWithResult implements Callable<String>{ 
	private int id; 
 
	public TaskWithResult(int id){ 
		this.id = id; 
	} 
 
	/** 
	 * 任務的具體過程,一旦任務傳給ExecutorService的submit方法,
	 * 則該方法自動在一個執行緒上執行
	 */ 
	public String call() throws Exception {
		System.out.println("call()方法被自動呼叫!!!    " + Thread.currentThread().getName()); 
		//該返回結果將被Future的get方法得到
		return "call()方法被自動呼叫,任務返回的結果是:" + id + "    " + Thread.currentThread().getName(); 
	} 
}

某次執行結果如下:
在這裡插入圖片描述

從結果中可以同樣可以看出,submit也是首先選擇空閒執行緒來執行任務,如果沒有,才會建立新的執行緒來執行任務。另外,需要注意:如果Future的返回尚未完成,則get()方法會阻塞等待,直到Future完成返回,可以通過呼叫isDone()方法判斷Future是否完成了返回。

自定義執行緒池

自定義執行緒池,可以用ThreadPoolExecutor類建立,它有多個構造方法來建立執行緒池,用該類很容易實現自定義的執行緒池,這裡先貼上示例程式:

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
 
public class ThreadPoolTest{ 
	public static void main(String[] args){ 
		//建立等待佇列 
		BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20); 
		//建立執行緒池,池中儲存的執行緒數為3,允許的最大執行緒數為5
		ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue); 
		//建立七個任務 
		Runnable t1 = new MyThread(); 
		Runnable t2 = new MyThread(); 
		Runnable t3 = new MyThread(); 
		Runnable t4 = new MyThread(); 
		Runnable t5 = new MyThread(); 
		Runnable t6 = new MyThread(); 
		Runnable t7 = new MyThread(); 
		//每個任務會在一個執行緒上執行
		pool.execute(t1); 
		pool.execute(t2); 
		pool.execute(t3); 
		pool.execute(t4); 
		pool.execute(t5); 
		pool.execute(t6); 
		pool.execute(t7); 
		//關閉執行緒池 
		pool.shutdown(); 
	} 
} 
 
class MyThread implements Runnable{ 
	@Override 
	public void run(){ 
		System.out.println(Thread.currentThread().getName() + "正在執行。。。"); 
		try{ 
			Thread.sleep(100); 
		}catch(InterruptedException e){ 
			e.printStackTrace(); 
		} 
	} 
}

執行結果如下:
在這裡插入圖片描述

從結果中可以看出,七個任務是線上程池的三個執行緒上執行的。這裡簡要說明下用到的ThreadPoolExecuror類的構造方法中各個引數的含義。

public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long         keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

corePoolSize:執行緒池中所儲存的核心執行緒數,包括空閒執行緒。

maximumPoolSize:池中允許的最大執行緒數。

keepAliveTime:執行緒池中的空閒執行緒所能持續的最長時間。

unit:持續時間的單位。

workQueue:任務執行前儲存任務的佇列,僅儲存由execute方法提交的Runnable任務。
根據ThreadPoolExecutor原始碼前面大段的註釋,我們可以看出,當試圖通過excute方法講一個Runnable任務新增到執行緒池中時,按照如下順序來處理:

  1. 如果執行緒池中的執行緒數量少於corePoolSize,即使執行緒池中有空閒執行緒,也會建立一個新的執行緒來執行新新增的任務;

  2. 如果執行緒池中的執行緒數量大於等於corePoolSize,但緩衝佇列workQueue未滿,則將新新增的任務放到workQueue中,按照FIFO的原則依次等待執行(執行緒池中有執行緒空閒出來後依次將緩衝佇列中的任務交付給空閒的執行緒執行);

  3. 如果執行緒池中的執行緒數量大於等於corePoolSize,且緩衝佇列workQueue已滿,但執行緒池中的執行緒數量小於maximumPoolSize,則會建立新的執行緒來處理被新增的任務;

  4. 如果執行緒池中的執行緒數量等於了maximumPoolSize,有4種才處理方式(該構造方法呼叫了含有5個引數的構造方法,並將最後一個構造方法為RejectedExecutionHandler型別,它在處理執行緒溢位時有4種方式,這裡不再細說,要了解的,自己可以閱讀下原始碼)。

總結起來,也即是說,當有新的任務要處理時,先看執行緒池中的執行緒數量是否大於corePoolSize,再看緩衝佇列workQueue是否滿,最後看執行緒池中的執行緒數量是否大於maximumPoolSize。

另外,當執行緒池中的執行緒數量大於corePoolSize時,如果裡面有執行緒的空閒時間超過了keepAliveTime,就將其移除執行緒池,這樣,可以動態地調整執行緒池中執行緒的數量。

我們大致來看下Executors的原始碼,newCachedThreadPool的不帶RejectedExecutionHandler引數(即第五個引數,執行緒數量超過maximumPoolSize時,指定處理方式)的構造方法如下:

   public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

它將corePoolSize設定為0,而將maximumPoolSize設定為了Integer的最大值,執行緒空閒超過60秒,將會從執行緒池中移除。由於核心執行緒數為0,因此每次新增任務,都會先從執行緒池中找空閒執行緒,如果沒有就會建立一個執行緒(SynchronousQueue決定的,後面會說)來執行新的任務,並將該執行緒加入到執行緒池中,而最大允許的執行緒數為Integer的最大值,因此這個執行緒池理論上可以不斷擴大。
再來看newFixedThreadPool的不帶RejectedExecutionHandler引數的構造方法,如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

擴大,另外,keepAliveTime設定為了0,也就是說執行緒只要空閒下來,就會被移除執行緒池,敢於LinkedBlockingQueue下面會說。
下面說說幾種排隊的策略:

  1. 直接提交。緩衝佇列採用 SynchronousQueue,它將任務直接交給執行緒處理而不保持它們。如果不存在可用於立即執行任務的執行緒(即執行緒池中的執行緒都在工作),則試圖把任務加入緩衝佇列將會失敗,因此會構造一個新的執行緒來處理新新增的任務,並將其加入到執行緒池中。直接提交通常要求無界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒絕新提交的任務。newCachedThreadPool採用的便是這種策略。

  2. 無界佇列。使用無界佇列(典型的便是採用預定義容量的 LinkedBlockingQueue,理論上是該緩衝佇列可以對無限多的任務排隊)將導致在所有 corePoolSize 執行緒都工作的情況下將新任務加入到緩衝佇列中。這樣,建立的執行緒就不會超過 corePoolSize,也因此,maximumPoolSize 的值也就無效了。當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列。newFixedThreadPool採用的便是這種策略。

  3. 有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(一般緩衝佇列使用ArrayBlockingQueue,並制定佇列的最大長度)有助於防止資源耗盡,但是可能較難調整和控制,佇列大小和最大池大小需要相互折衷,需要設定合理的引數。