1. 程式人生 > >JDK併發包-執行緒複用:執行緒池

JDK併發包-執行緒複用:執行緒池

為了避免系統頻繁地建立和銷燬執行緒,我們可以讓建立的執行緒進行復用。執行緒池中,總有那麼幾個活躍執行緒。當你需要使用執行緒時,可以從池子中隨便拿一個空閒執行緒,當完成工作時,並不急著關閉執行緒,而是將整個執行緒退回到池子,方便其他人使用。

1.1 JDK對執行緒池的支援

JDK提供一套Executor框架,幫助開發人員更好的有效的控制多執行緒。

其中ThreadPoolExecutor表示一個執行緒池。Executors類則扮演這執行緒池工廠的角色。通過Executors可以取得一個擁有特定功能的執行緒池。ThreadPoolExecutor類實現了Executor介面,因此通過這個介面,任何Runnable的物件都可以被ThreadPoolExecutor執行緒池排程。

Executor框架提供了各種型別的執行緒池,主要有以下工廠方法:

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool
(int corePoolSize)
  • newFixedThreadPool:返回一個固定數量的執行緒池。當一個新任務提交時,如果有空閒執行緒,則執行。否則新任務暫存在一個任務佇列中,待有空閒時,便處理在任務佇列中的任務。
  • newSingleThreadExecutor:返回一個執行緒的執行緒池。當多餘一個新任務提交時,會暫存在一個任務佇列中,待有空閒時,按先入先出的順序處理在任務佇列中的任務。
  • newCachedThreadPool:返回一個可根據實際情況調整執行緒數量的執行緒池,執行緒數量不確定,若有空閒,則會有限複用執行緒。否則建立新執行緒處理任務。所有執行緒在當前任務執行完後,將返回執行緒池待複用。
  • newSingleThreadScheduledExecutor:返回一個ScheduledExecutorService物件,執行緒池大小為1。ScheduledExecutorService在Executor介面之上擴充套件了在給定時間執行某任務的功能。如果在某個固定的延時之後執行,或週期性執行某個任務。可以用這個工廠。
  • newScheduledThreadPool,返回一個ScheduledExecutorService物件,但該執行緒可以指定執行緒數量。

1.2 核心執行緒池的內部實現

對於核心的幾個執行緒池,無論是newFixedThreadPool()方法、newSingleThreadExecutor()還是newCachedThreadPool()方法,其內部實現均使用了ThreadPoolExecutor實現。下面給出三個執行緒池的實現方式:

public static ExecutorService newSingleThreadExecutor() {   
	return new FinalizableDelegatedExecutorService   
		(new ThreadPoolExecutor(1, 1,   
                                0L, TimeUnit.MILLISECONDS,   
                                new LinkedBlockingQueue<Runnable>()));   
}

public static ExecutorService newFixedThreadPool(int nThreads) {     
eturn new ThreadPoolExecutor(nThreads, nThreads,     
  0L, TimeUnit.MILLISECONDS,     
                new LinkedBlockingQueue<Runnable>());     
}  

public static ExecutorService newCachedThreadPool() {   
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   
                                  60L, TimeUnit.SECONDS,   
                                  new SynchronousQueue<Runnable>());   
}

由以上執行緒池實現程式碼可以看到,它們都只是ThreadPoolExecutor類的封裝。ThreadPoolExecutor最重要的建構函式如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:指定了執行緒池中的執行緒數量。
  • maximumPoolSize:指定了執行緒池中的最大執行緒數量。
  • keepAliveTime:當前執行緒池數量超過corePoolSize時,多餘的空閒執行緒的存活時間,即多次時間內會被銷燬。
  • unit:keepAliveTime的單位。
  • workQueue:任務佇列,被提交但尚未被執行的任務。
  • threadFactory:執行緒工廠,用於建立執行緒,一般用預設的即可。
  • handler:拒絕策略,當任務太多來不及處理,如何拒絕任務。

引數workQueue指被提交但未執行的任務佇列,它是一個BlockingQueue介面的物件,僅用於存放Runnable物件。根據佇列功能分類,在ThreadPoolExecutor的建構函式中可使用一下幾種BlockingQueue。

  1. 直接提交的佇列:該功能由synchronousQueue物件提供,synchronousQueue物件是一個特殊的BlockingQueue。synchronousQueue沒有容量,每一個插入操作都要等待一個響應的刪除操作,反之每一個刪除操作都要等待對應的插入操作。如果使用synchronousQueue,提交的任務不會被真實的儲存,而總是將新任務提交給執行緒執行,如果沒有空閒執行緒,則嘗試建立執行緒,如果執行緒數量已經達到了最大值,則執行拒絕策略,因此,使用synchronousQueue佇列,通常要設定很大的maximumPoolSize值,否則很容易執行拒絕策略。
  2. 有界的任務佇列:有界任務佇列可以使用ArrayBlockingQueue實現。ArrayBlockingQueue建構函式必須帶有一個容量引數,表示佇列的最大容量。
public ArrayBlockingQueue(int capacity)

當使用有界任務佇列時,若有新任務需要執行時,如果執行緒池的實際執行緒數量小於corePoolSize,則會優先建立執行緒。若大於corePoolSize,則會將新任務加入等待佇列。若等待佇列已滿,無法加入,則在匯流排程數不大於maximumPoolSize的前提下,建立新的執行緒執行任務。若大於maximumPoolSize,則執行拒絕策略。可見有界佇列僅當在任務佇列裝滿後,才可能將執行緒數量提升到corePoolSize以上,換言之,除非系統非常繁忙,否則確保核心執行緒數維持在corePoolSize。
3. 無界的任務佇列:無界佇列可以通過LinkedBlockingQueue類實現。與有界佇列相比,除非系統資源耗盡,無界佇列的任務佇列不存在任務入隊失敗的情況。若有新任務需要執行時,如果執行緒池的實際執行緒數量小於corePoolSize,則會優先建立執行緒執行。但當系統的執行緒數量達到corePoolSize後就不再建立了,這裡和有界任務佇列是有明顯區別的。若後續還有新任務加入,而又沒有空閒執行緒資源,則任務直接進入佇列等待。若任務建立和處理的速度差異很大,無界佇列會保持快速增長,知道耗盡系統記憶體。
4. 優先任務佇列:帶有優先級別的佇列,它通過PriorityBlokingQueue實現,可以控制任務執行的優先順序。它是一個特殊的無界佇列。無論是ArrayBlockingQueue還是LinkedBlockingQueue實現的佇列,都是按照先進先出的演算法處理任務,而PriorityBlokingQueue根據任務自身優先順序順序先後執行,在確保系統性能同時,也能很好的質量保證(總是確保高優先順序的任務優先執行)。

newFixedThreadPool()方法的實現,它返回一個corePoolSize和maximumPoolSize一樣的,並使用了LinkedBlockingQueue任務佇列(無界佇列)的執行緒池。當任務提交非常頻繁時,該佇列可能迅速膨脹,從而系統資源耗盡。

newSingleThreadExecutor()返回單執行緒執行緒池,是newFixedThreadPool()方法的退化,只是簡單的將執行緒池數量設定為1。

newCachedThreadPool()方法返回corePoolSize為0而maximumPoolSize無窮大的執行緒池,這意味著沒有任務的時候執行緒池內沒有現場,而當任務提交時,該執行緒池使用空閒執行緒執行任務,若無空閒則將任務加入SynchronousQueue佇列,而SynchronousQueue佇列是直接提交佇列,它總是破事執行緒池增加新的執行緒來執行任務。當任務執行完後由於corePoolSize為0,因此空閒執行緒在指定時間內(60s)被回收。對於newCachedThreadPool(),如果有大量任務提交,而任務又不那麼快執行時,那麼系統變回開啟等量的執行緒處理,這樣做法可能會很快耗盡系統的資源,因為它會增加無窮大數量的執行緒。

1.3 拒絕策略

當任務數量超過系統實際承載能力時,需要拒絕策略機制解決這個問題。JDK內建的拒絕策略如下:

  1. AbortPolicy : 直接丟擲異常,阻止系統正常執行。
  2. CallerRunsPolicy : 只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是,任務提交執行緒的效能極有可能會急劇下降。
  3. DiscardOldestPolicy : 丟棄最老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務。
  4. DiscardPolicy : 該策略默默地丟棄無法處理的任務,不予任何處理。如果允許任務丟失,這是最好的一種方案。

以上內建拒絕策略均實現了RejectedExecutionHandler介面,若以上策略仍無法滿足實際需要,完全可以自己擴充套件RejectedExecutionHandler介面。RejectedExecutionHandler的定義如下:

public interface RejectedExecutionHandler() {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor  executor);
});

下面程式碼演示自定義執行緒池和拒絕策略的使用:

public class RejectThreadPoolDemo {
	public static class MyTask implements Runnable {
		@Override
		public void run() {
			System.out.println(System.currentTimeMillis() + ":Thread ID:"
					+ Thread.currentThread().getId());

			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	public static void main(String[] args) throws InterruptedException {
		MyTask task = new MyTask();
		ExecutorService es = new ThreadPoolExecutor(5, 5,
				0L, TimeUnit.MILLISECONDS,
				new LinkedBlockingDeque<Runnable>(10),
				Executors.defaultThreadFactory(),
				new RejectedExecutionHandler() {
					@Override
					public void rejectedExecution(Runnable r, ThreadPoolExecutor  executor) {    
						System.out.println(r.toString() + "is discard");
					}
				});

		for(int i=0; i<Integer.MAX_VALUE; i++)  {
			es.submit(task);
			Thread.sleep(10);
		}
	}
}

1.4 ThreadFactory自定義執行緒建立

ThreadFactory是一個介面,它只有一個方法,用來建立執行緒:

Thread newThread(Runnable r);

下面是使用自定義ThreadFactory的案例,一方面記錄執行緒的建立,另一方面將所有執行緒設定為守護執行緒,這樣主執行緒退出,將會強制銷燬執行緒池。


public static void main(String[] args) throws InterruptedException {
	MyTask task = new MyTask();
	ExecutorService es = new ThreadPoolExecutor(5,5,
			0L, TimeUnit.MILLISECONDS,
			new SynchronousQueue<Runnable>(),
			new ThreadFactory() {
				@Override
				public Thread newThread(Runnable r) {
					Thread t = new Thread(r);
					t.setDaemon(true);
					System.out.println("create " + t);
					return t;
				}
			});
	for(int i=0; i<5; i++) {
		es.submit(task);
	}
	Thread.sleep(2000);
}

1.5 擴充套件執行緒池

雖然JDK實現了非常穩定的高效能執行緒池,但是我們可以對這個執行緒池做一些擴充套件,增強執行緒池的功能。ThreadPoolExecutor是一個可以擴充套件的執行緒池。它提供了beforeExecute()、afterExecute()、terminated()三個介面對執行緒池進行控制。這三個方法分別用於記錄一個任務的開始、結束和整個執行緒池的退出。下面演示了對執行緒池的擴充套件,在這個拓展中,我們將記錄每一個任務的執行日誌。

public class ExtThreadPool {
	public static class MyTask implements Runnable {
		public String name;
		public MyTask(String name) {
			this.name = name;
		}
		@Override
		public void run() {
			System.out.println("正在執行" + ":Thread ID:" + Thread.currentThread().getId()
					+ ",Task Name=" + name);
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	public static void main(String[] args) throws InterruptedException {
		ExecutorService es = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS,
				new LinkedBlockingQueue<Runnable>()) {
			@Override
			protected void beforeExecute(Thread t, Runnable r) {
				System.out.println("準備執行:" + ((MyTask)r).name);
			}
			@Override
			protected void afterExecute(Runnable r, Throwable t) {
				System.out.println("執行完成:" + ((MyTask)r).name);
			}
			@Override
			protected void terminated() {
				System.out.println("執行緒池退出");
			}
		};
		for(int i=0; i<5; i++) {
			MyTask task = new MyTask("TASK-GEYM-" + i);
			es.execute(task);
			Thread.sleep(10);
		}
		es.shutdown();
	}
}

1.6 優化執行緒池執行緒數量

執行緒池的大小對系統性能有一定的影響,過大或過小的執行緒數量都無法發揮最優的系統性能,因此要避免極大和極小兩種情況。 在《java Concurrency in Practice》中給出了一個估算執行緒池大小的經驗公式:

Ncpu = CPU數量
Ucpu = 目標CPU的使用率(0 ≤ Ucpu ≤ 1 )
W/C = 等待時間與計算時間的比率
最優的池大小等於
Nthreads = Ncpu * Ucpu * (1+W/C)

在java中可以通過Runtime.getRuntime().availableProcessors()取得可用CPU數量。

1.7 線上程池中尋找堆疊

向執行緒池討回異常堆疊的方法。一種最簡單的方法,就是放棄submit(),改用execute()。將任務提交的程式碼改成:

pools.execute(new DivTask(100,i));

或者使用下面的方法改造submit():

Future re = pools.submit(new DivTask(100,i));
re.get();

上述方法,從異常堆疊中只能知道異常在哪裡丟擲的。還希望知道這個任務在哪裡提交的,需要我們擴充套件ThreadPoolExecutor執行緒池,讓它在排程之前,先儲存一下提交任務執行緒的堆疊資訊。如下所示:

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
	public TraceThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
			long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
		super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue);
	}
	@Override
	public void execute(Runnable task) {
		// TODO Auto-generated method stub
		super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
	}

	@Override
	public Future<?> submit(Runnable task) {
		// TODO Auto-generated method stub
		return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
	}
	private Exception clientTrace() {
		return new Exception("Client stack trace");
	}
	private Runnable wrap(final Runnable task,final Exception clientStack, String clientThreadName) {
		return new Runnable() {
			@Override
			public void run() {
				try {
					task.run();
				} catch (Exception e) {
					clientStack.printStackTrace();
					try {
						throw e;
					} catch (Exception e1) {
						// TODO Auto-generated catch block
						e1.printStackTrace();
					}
				}
			}
		};
	}
	public static class DivTask implements Runnable {
		int a,b;
		public DivTask(int a,int b) {
			this.a = a;
			this.b = b;
		}
		@Override
		public void run() {
			double re = a/b;
			System.out.println(re);
		}
	}

	public static void main(String[] args) {
		ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE,
				0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
		for(int i=0; i<5; i++)
			pools.execute(new DivTask(100, i));
	}
}

1.8 Fork/Join框架

分而治之一直是一個非常有效處理大量資料的方法。fork/join框架是ExecutorService介面的一個實現,可以幫助開發人員充分利用多核處理器的優勢,編寫出並行執行的程式,提高應用程式的效能;設計的目的是為了處理那些可以被遞迴拆分的任務。
在實際使用中,如果毫無顧忌地使用fork()開啟執行緒進行處理,那麼很有可能導致系統開啟過多的執行緒而嚴重影響效能。所以,在JDK中,給出了一個ForkJoinPool執行緒池,對於fork()方法並不急於開啟執行緒,而是提交給ForkJoinPool()執行緒池進行處理,以節省系統資源。可以向ForkJoinPool執行緒池提交一個ForkJoinTask任務。所謂ForkJoinTask任務就是支援fork()分解以及join()等待的任務。ForkJoinTask有兩個重要的子類,RecursiveAction和RecursiveTask。它們分別代表沒有返回值的任務和可以攜帶返回值的任務。
下面簡單地展示Fork/Join框架的使用,這裡用來計算數列求和:

public class CountTask extends RecursiveTask<Long> {
	private static final int THRESHOLD = 10000;
	private long start;
	private long end;
	public CountTask(long start,long end) {
		this.start = start;
		this.end = end;
	}
	public Long compute() {
		long sum = 0;
		boolean canCompute = (end - start) < THRESHOLD;
		if(canCompute) {
			for(long i=start; i<=end; i++) {
				sum += i;
			}
		} else {
			long step = (start + end)/100;
			ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
			long pos = start;
			for(int i=0; i<100; i++) {
				long lastOne = pos + step;
				if(lastOne > end) lastOne = end;
				CountTask subTask = new CountTask(pos, lastOne);
				pos += step+1;
				subTasks.add(subTask);
				subTask.fork();
			}
			for(CountTask t:subTasks) {
				sum += t.join();
			}
		}
		return sum;
	}
	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		CountTask task = new CountTask(0,200000L);
		ForkJoinTask<Long> result = forkJoinPool.submit(task);
		try {
			long res = result.get();
			System.out.println("sum="+res);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
}

在使用ForkJoin時需要注意,如果任務的劃分層次很深,一直得不到返回,那麼可能出現兩種情況:第一,系統內的執行緒數量越積越多,導致效能嚴重下降。第二,函式的呼叫層次變得很深,最終導致棧溢位。
此外,ForkJoin執行緒池使用一個無鎖是棧來管理空閒執行緒。如果一個工作執行緒暫時取不到可用的任務,則可能會被掛起,掛起的執行緒將會被壓入由執行緒池維護的棧中。待將來有任務可用時,再從棧中喚醒這些執行緒。