Java多執行緒之Executor框架<Callable、Future、Executor和ExecutorService>
引言
Executor框架是指JDK 1.5中引入的一系列併發庫中與Executor相關的功能類,包括Executor、Executors、ExecutorService、Future、Callable等。
一、為什麼要引入Executor框架?
1、如果使用new Thread(...).start()的方法處理多執行緒,有如下缺點:
① 開銷大。對於JVM來說,每次新建執行緒和銷燬執行緒都會有很大的開銷。
② 執行緒缺乏管理。沒有一個池來限制執行緒的數量,如果併發量很高,會建立很多執行緒,而且執行緒之間可能會有相互競爭,這將會過多佔用系統資源,增加系統資源的消耗量。而且執行緒數量超過系統負荷,容易導致系統不穩定。
2、使用執行緒池的方法,有如下優點:
① 執行緒複用。通過複用建立了的執行緒,減少了執行緒的建立、消亡的開銷。
② 有效控制併發執行緒數。
③ 提供了更簡單靈活的執行緒管理。可以提供定時執行、單執行緒、可變執行緒數等多種使用功能。
二、Executor框架的UML圖
三、下面開始分析一下Executor框架中幾個比較重要的介面和類。
1、Callable
Callable位於java.util.concurrent包下,它是一個介面,只聲明瞭一個call()方法。
Callable介面類似於Runnable,兩者都是為了可能線上程中執行的類而設計的。和Runnable介面中的run()方法類似,Callable 提供的call()方法作為執行緒的執行體。但是call()比run()方法更強大,體現在
① call方法有返回值。
② call方法可以宣告丟擲異常。
2、Future
Future 介面位於java.util.concurrent包下,是Java 1.5中引入的介面。
Future主要用來對具體的Runnable或Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get()方法獲取執行結果,get()方法會阻塞知道任務返回結果。
當你提交一個Callable物件給執行緒池時,將得到一個Future物件,並且它和你傳入的Callable示例有相同泛型。
Future 介面中的5個方法:
public interface Future<V> { //用來取消任務 //引數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務。 boolean cancel(boolean mayInterruptIfRunning); //表示任務是否被取消成功 boolean isCancelled(); //表示任務是否已經完成 boolean isDone(); //用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回; V get() //用來獲取執行結果,如果在指定時間內,還沒獲取到結果,會丟擲TimeoutException異常。 V get(long timeout, TimeUnit unit) }
Future提供了三種功能:
① 判斷任務是否完成
② 能夠中斷任務
③ 能夠獲取任務執行結果
3、Executor
Executor是一個介面,它將任務的提交與任務的執行分離開來,定義了一個接收Runnable物件的方法executor。Executor是Executor框架中最基礎的一個介面,類似於集合中的Collection介面。
4、ExecutorService
ExecutorService繼承了Executor,是一個比Executor使用更廣泛的子類介面。定義了終止任務、提交任務、跟蹤任務返回結果等方法。
一個ExecutorService是可以關閉的,關閉之後它將不能再接收任何任務,對於不在使用的ExecutorService,應該將其關閉以釋放資源。
ExecutorService方法介紹:
package java.util.concurrent;
import java.util.List;
import java.util.Collection;
public interface ExecutorService extends Executor {
/**
* 平滑地關閉執行緒池,已經提交到執行緒池中的任務會繼續執行完。
*/
void shutdown();
/**
* 立即關閉執行緒池,返回還沒有開始執行的任務列表。
* 會嘗試中斷正在執行的任務(每個執行緒呼叫 interruput方法),但這個行為不一定會成功。
*/
List<Runnable> shutdownNow();
/**
* 判斷執行緒池是否已經關閉
*/
boolean isShutdown();
/**
* 判斷執行緒池的任務是否已經執行完畢。
* 注意此方法呼叫之前需要先呼叫shutdown()方法或者shutdownNow()方法,否則總是會返回false
*/
boolean isTerminated();
/**
* 判斷執行緒池的任務是否都執行完。
* 如果沒有任務沒有執行完畢則阻塞,直至任務完成或者達到了指定的timeout時間就會返回
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交帶有一個返回值的任務到執行緒池中去執行(回撥),返回的 Future 表示任務的待定結果。
* 當任務成功完成後,通過 Future 例項的 get() 方法可以獲取該任務的結果。
* Future 的 get() 方法是會阻塞的。
*/
<T> Future<T> submit(Callable<T> task);
/**
*提交一個Runnable的任務,當任務完成後,可以通過Future.get()獲取的是提交時傳遞的引數T result
*
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一個Runnable的人無語,它的Future.get()得不到任何內容,它返回值總是Null。
* 為什麼有這個方法?為什麼不直接設計成void submit(Runnable task)這種方式?
* 這是因為Future除了get這種獲取任務資訊外,還可以控制任務,
具體體現在 Future的這個方法上:boolean cancel(boolean mayInterruptIfRunning)
這個方法能夠去取消提交的Rannable任務。
*/
Future<?> submit(Runnable task);
/**
* 執行一組給定的Callable任務,返回對應的Future列表。列表中每一個Future都將持有該任務的結果和狀態。
* 當所有任務執行完畢後,方法返回,此時並且每一個Future的isDone()方法都是true。
* 完成的任務可能是正常結束,也可以是異常結束
* 如果當任務執行過程中,tasks集合被修改了,那麼方法的返回結果將是不確定的,
即不能確定執行的是修改前的任務,還是修改後的任務
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 執行一組給定的Callable任務,返回對應的Future列表。列表中每一個Future都將持有該任務的結果和狀態。
* 當所有任務執行完畢後或者超時後,方法將返回,此時並且每一個Future的isDone()方法都是true。
* 一旦方法返回,未執行完成的任務被取消,而完成的任務可能正常結束或者異常結束,
* 完成的任務可以是正常結束,也可以是異常結束
* 如果當任務執行過程中,tasks集合被修改了,那麼方法的返回結果將是不確定的
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 執行一組給定的Callable任務,當成功執行完(沒拋異常)一個任務後此方法便返回,返回的是該任務的結果
* 一旦此正常返回或者異常結束,未執行的任務都會被取消。
* 如果當任務執行過程中,tasks集合被修改了,那麼方法的返回結果將是不確定的
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 執行一組給定的Callable任務,當在timeout(超時)之前成功執行完(沒拋異常)一個任務後此方法便返回,返回的是該任務的結果
* 一旦此正常返回或者異常結束,未執行的任務都會被取消。
* 如果當任務執行過程中,tasks集合被修改了,那麼方法的返回結果將是不確定的
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
shutdown() 和 shutdownNow() 是用來關閉連線池的兩個方法,而且這兩個方法都是在當前執行緒立即返回,不會阻塞至執行緒池中的方法執行結束。呼叫這兩個方法之後,連線池將不能再接受任務。
下面給寫幾個示例來加深ExecutorService的方法的理解。
先寫兩個任務類:ShortTask和LongTask,這兩個類都繼承了Runnable介面,ShortTask的run()方法執行很快,LongTask的run()方法執行時間為10s。
public class LongTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("complete a long task");
}
}
public class ShortTask implements Runnable {
@Override
public void run() {
System.out.println("complete a short task...");
}
}
測試shutdown()方法
package OSChina.Client;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
public class Client10 {
public static void main(String[] args) {
ExecutorService threadpool = Executors.newFixedThreadPool(4);
threadpool.submit(new ShortTask());
threadpool.submit(new ShortTask());
threadpool.submit(new LongTask());
threadpool.submit(new ShortTask());
threadpool.shutdown();
boolean isShutdown = threadpool.isShutdown();
System.out.println("執行緒池是否已經關閉:" + isShutdown);
final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
try {
while (!threadpool.awaitTermination(1L, TimeUnit.SECONDS)){
System.out.println("執行緒池中還有任務在執行,當前時間:" + sdf.format(new Date()));
}
System.out.println("執行緒池中已經沒有在執行的任務,執行緒池已完全關閉!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試shutdownNow()方法
package OSChina.Client;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Client11 {
public static void main(String[] args) {
ExecutorService threadpool = Executors.newFixedThreadPool(3);
//將5個任務提交到有3個執行緒的執行緒池
threadpool.submit(new LongTask());
threadpool.submit(new LongTask());
threadpool.submit(new LongTask());
threadpool.submit(new LongTask());
threadpool.submit(new LongTask());
try {
TimeUnit.SECONDS.sleep(1L);
//關閉執行緒池
List<Runnable> waiteRunnables = threadpool.shutdownNow();
System.out.println("還沒有執行的任務數:" + waiteRunnables.size());
boolean isShutdown = threadpool.isShutdown();
System.out.println("執行緒池是否已經關閉:" + isShutdown);
final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
while (!threadpool.awaitTermination(1L, TimeUnit.SECONDS)) {
System.out.println("執行緒池中還有任務在執行,當前時間:" + sdf.format(new Date()));
}
System.out.println("執行緒池中已經沒有在執行的任務,執行緒池已完全關閉!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
當呼叫shutdownNow()後,三個執行的任務都被interrupt了。而且awaitTermination(1L, TimeUnit.SECONDS)返回的都是true。
測試submit(Callable<T> task)方法
package OSChina.Client;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
public class CallableTask implements Callable{
@Override
public Object call() throws Exception {
TimeUnit.SECONDS.sleep(5L);
return "success";
}
}
package OSChina.Client;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Client12 {
public static void main(String[] args) {
ExecutorService threadpool = null;
threadpool = Executors.newFixedThreadPool(3);
final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println("提交一個callable任務到執行緒池,現在時間是:" + sdf.format(new Date()));
Future<String> future = threadpool.submit(new CallableTask());
try {
System.out.println("獲取callable任務的結果:" + future.get() + ",現在時間是:" + sdf.format(new Date()));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
if(threadpool!=null){
threadpool.shutdown();
}
}
}
}