Java併發程式設計(5)-Executor執行緒排程框架解讀
文章目錄
更多關於Java併發程式設計的文章請點選這裡:Java併發程式設計實踐(0)-目錄頁
Executor框架是在JDK5之後引入的一種執行緒排程類的集合,這些類的集合可以被統稱為執行緒排程框架,本文將介紹Executor執行緒排程框架、各類執行緒池、Runnable和Callable< T >任務排程介面以及如何使用執行緒排程框架構造一個併發程式,最後介紹一下ExecutorCompletionService。
本文總結自《Java併發程式設計實踐》第六章 任務執行 ,以及一些相關部落格。
一、Executor執行緒排程框架
1.1、什麼是執行緒排程框架
Executor框架
是在JDK5之後引入的一種執行緒排程類的集合,這些類的集合可以被統稱為執行緒排程框架,在這個框架體系中,任務的執行最小單元不是執行緒(Thread
),而是任務(Task
);而Executor可視為任務的執行器,它的作用是執行任務,進行任務的排程。常用的執行緒排程類和介面有:
Executors工廠類、Executor介面、ExecutorService介面、ExecutorCompletionService類、Runnable任務介面、Callable< T >響應式任務介面、Future< T >任務狀態類、newFixedThreadPool定長執行緒池、newCachedThreadPool可快取執行緒池、newSingleThreadExecutor單一執行緒池、newScheduledThreadPool可排程執行緒池。以上可簡單地用結構圖表示:
1.2、Executors
提供了一系列靜態工廠方法用於建立各種執行緒池,可返回一個Executor物件或者ExecutorService物件,用以執行任務。
1.3、Executor
一個介面,其定義了一個接收Runnable物件的方法executor(Runnable command),用以執行Runabl任務。
1.4、ExecutorService
繼承於Executor,是一個比Executor使用更廣泛的子類介面,其提供了生命週期管理的方法,以及可跟蹤一個或多個非同步任務執行狀況返回Future的方法。
1.5、ScheduledThreadPoolExecutor
ScheduledExecutorService的實現,一個可定時排程任務的執行緒池,由Executors.newScheduledThreadPool返回,可用來代替Timer進行定時任務排程。
二、各類執行緒池
2.1、newFixedThreadPool
建立可重用且固定執行緒數的執行緒池,如果執行緒池中的所有執行緒都處於活動狀態,此時再提交任務就在佇列中等待,直到有可用執行緒;如果執行緒池中的某個執行緒由於異常而結束時,執行緒池就會再補充一條新執行緒。原理是有界的阻塞佇列。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
//使用一個基於FIFO排序的阻塞佇列,在所有corePoolSize執行緒都忙時新任務將在佇列中等待
new LinkedBlockingQueue<Runnable>());
}
2.2、newCachedThreadPool
建立可快取的執行緒池,如果執行緒池中的執行緒在60秒未被使用就將被移除,在執行新的任務時,當執行緒池中有之前建立的可用執行緒就重用可用執行緒,否則就新建一條執行緒。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
//使用同步佇列,將任務直接提交給執行緒
new SynchronousQueue<Runnable>());
}
2.3、newScheduledThreadPool
建立一個可延遲執行或定期執行的執行緒池。
public class HeartBeat {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
Runnable task = new Runnable() {
public void run() {
System.out.println("HeartBeat.........................");
}
};
executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS); //5秒後第一次執行,之後每隔3秒執行一次
}
}
2.4、newSingleThreadExecutor
建立一個單執行緒的Executor,如果該執行緒因為異常而結束就新建一條執行緒來繼續執行後續的任務。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
//corePoolSize和maximumPoolSize都等於,表示固定執行緒池大小為1
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
三、Runnable和Callable< T >任務排程介面
3.1、Runnable介面
每個任務排程介面都是另起一個執行緒作業,其中Runnable介面中的run方法不能返回引數,所以也稱為無響應任務介面,結合上面的內容,我們來看以下簡單的使用:
@Test
public void demo1(){
//使用Executors工廠類建立含10個執行緒的Executor執行器
Executor executor = Executors.newFixedThreadPool(10);
//建立一個無響應式任務
Runnable task = new Runnable(){
public void run(){
//具體任務實現
System.out.println("任務已執行");
}
};
//執行任務
executor.execute(task);
}
結果:
3.2、Callable< T >介面和Future介面
Callable< T >介面用以實現有返回值的任務,需要注意的是,應該宣告ExecutorService 介面型別來提交callable任務,並且獲得了當前任務的狀態宣告Future物件,通過它獲取到任務返回的值,如下:
@Test
public void demo2() throws ExecutionException, InterruptedException {
//使用Executors工廠類建立含10個執行緒的ExecutorService執行器
ExecutorService executor = Executors.newFixedThreadPool(10);
//建立一個響應式任務
Callable<Integer> task = new Callable<Integer>(){
public Integer call(){
//執行有返回值的任務
int i = 0;
return ++i;
}
};
//提交任務,獲得任務狀態類Futrue
Future<Integer> future = executor.submit(task);
//獲得Future中的任務返回的響應值
Integer response = future.get();
System.out.println("任務執行,並返回了:"+response);
}
四、使用執行緒排程框架構造一個併發程式
4.1、單執行緒模式
結合上述知識,我們可以構造一個簡單的併發程式了。設想這樣一個情景:小明早上起床了,他去煮水。。。
如果是單執行緒程式,比如只用main()方法執行,小明只能等水煮完了,才能去看書,聽歌,如以下程式碼:
public static void main(String[] args) {
//第一個迴圈執行10次,其中i表示小明正在煮水的進度
for (int i = 0; i < 10; i++) {
System.out.println("煮水中:"+i);
}
//第二個迴圈,表示看書
for (int i = 0; i < 5; i++) {
System.out.println("看書中:"+i);
}
//第三個迴圈,表示聽音樂
for (int i = 0; i < 5; i++) {
System.out.println("聽歌中: "+i);
}
//...
}
4.2、多執行緒排程模式-使用Runnable
上面的就是典型的單執行緒模式了,所有工作都要從頭執行到尾,可是在現實中,小明可以變煮水邊看書邊聽歌,這樣就不會浪費時間,所以這裡就要開啟至少3個執行緒去做這3件事情:
@Test
public void demo3() throws ExecutionException, InterruptedException {
//使用Executors工廠類建立含3個執行緒的ExecutorService執行器
ExecutorService executor = Executors.newFixedThreadPool(3);
//建立一個任務1-煮水
Runnable task1 = new Runnable(){
public void run(){
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+"煮水中: "+i);
}
}
};
//建立一個任務2-看書
Runnable task2 = new Runnable(){
public void run(){
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+"看書中: "+i);
}
}
};
//建立一個任務3-聽歌
Runnable task3 = new Runnable(){
public void run(){
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+"聽歌中: "+i);
}
}
};
//提交3個任務給執行緒池去分配執行緒執行
executor.execute(task1);
executor.execute(task2);
executor.execute(task3);
}
可以看出,Executor執行緒排程框架分配了3個執行緒去分別執行了這3個任務,使得程式變成了併發的,提高了工作的效率。
4.3、多執行緒排程模式-使用Callable< T >
當然,我們也可以使用Callable< T >任務介面來完成上述的多執行緒排程例子:
@Test
public void fun14() throws ExecutionException, InterruptedException {
//使用Executors工廠類建立含3個執行緒的ExecutorService執行器
ExecutorService executor = Executors.newFixedThreadPool(3);
//建立一個任務1-煮水
Callable<Integer> task1 = new Callable<Integer>(){
public Integer call(){
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+"煮水中 :"+i);
}
return null;
}
};
//建立一個任務2-看書
Callable<Integer> task2 = new Callable<Integer>(){
@Override
public Integer call(){
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+"看書中 :"+i);
}
return null;
}
};
//建立一個任務3-聽歌
Callable<Integer> task3 = new Callable<Integer>(){
@Override
public Integer call(){
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+"聽歌中 :"+i);
}
return null;
}
};
//提交3個任務給執行緒池去分配執行緒執行並獲得執行緒狀態物件
Future<Integer> future1 = executor.submit(task1);
Future<Integer> future2 = executor.submit(task2);
Future<Integer> future3 = executor.submit(task3);
}
五、ExecutorCompletionService
實現了CompletionService,將執行完成的任務放到阻塞佇列中,通過take或poll方法來獲得執行結果Future,然後呼叫Future的get方法即可獲得響應的結果了。
@Test
public void demo4() throws InterruptedException, ExecutionException {
CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();
//獲得含有快取執行緒池的Executor
ExecutorService executor = Executors.newCachedThreadPool();
//建立一個ExecutorCompletionService,將executor交給它管理
ExecutorCompletionService completionService = new ExecutorCompletionService(executor);
for (int i = 0; i < 10; i++) {
int result = i;
//建立一個響應式任務,迴圈執行10次
Callable<Object> task = new Callable<Object>(){
public Object call() throws InterruptedException {
return result;
}
};
//提交任務
completionService.submit(task);
//將結果放入集合中
Object o = completionService.take().get();
list.add((Integer)o);
}
//將集合遍歷
for (Object o : list) {
System.out.print((Integer)o+"\t");
}
}
六、ExecutorService的生命週期管理
ExecutorService繼承自Executor介面,定義了一些生命週期的方法:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
}
6.1、shutdown方法
這個方法用於會強制關閉ExecutorService,它將取消所有執行中的任務和在工作佇列中等待的任務。
6.2、shutdownNow方法
這個方法會強制關閉ExecutorService,它將取消所有執行中的任務和在工作佇列中等待的任務,這個方法返回一個List列表,列表中返回的是等待在工作佇列中的任務。
6.3、isShutdown方法
這個方法在ExecutorService關閉後返回true,否則返回false。
6.4、isTerminated方法
這個方法會校驗ExecutorService當前的狀態是否為“TERMINATED”即關閉狀態,當為關閉狀態時時返回true否則返回false。
6.5、awaitTermination方法
這個方法有兩個引數,一個是timeout即超時時間,另一個是unit即時間單位。這個方法會使執行緒等待timeout時長,當超過timeout時間後,會監測ExecutorService是否已經關閉,若關閉則返回true,否則返回false。