關閉執行緒的正確方法
執行緒在啟動之後,正常的情況下會執行到任務完成,但是有的情況下會需要提前結束任務,如使用者取消操作等。可是,讓執行緒安全、快速和可靠地停止並不是件容易的事情,因為Java中沒有提供安全的機制來終止執行緒。雖然有Thread.stop/suspend等方法,但是這些方法存在缺陷,不能保證執行緒中共享資料的一致性,所以應該避免直接呼叫。
執行緒在終止的過程中,應該先進行操作來清除當前的任務,保持共享資料的一致性,然後再停止。
慶幸的是,Java中提供了中斷機制,來讓多執行緒之間相互協作,由一個程序來安全地終止另一個程序。
1. 任務的取消
如果外部的程式碼能在某個操作正常完成之前將其設定為完成狀態,則該操作為可取消的
操作被取消的原因有很多,比如超時,異常,請求被取消等等。
一個可取消的任務要求必須設定取消策略,即如何取消,何時檢查取消命令,以及接收到取消命令之後如何處理。
最簡單的取消辦法就是利用取消標誌位,如下所示:
public class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();
private final List<BigInteger> primes
= new ArrayList<BigInteger>();
//取消標誌位
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
//每次在生成下一個素數時堅持是否取消
//如果取消,則退出
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator);
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();
}
}
這段程式碼用於生成素數,並在任務執行一秒鐘之後終止。其取消策略為:通過改變取消標誌位取消任務,任務在每次生成下一隨機素數之前檢查任務是否被取消,被取消後任務將退出。
然而,該機制的最大的問題就是無法應用於擁塞方法。假設在迴圈中呼叫了擁塞方法,任務可能因擁塞而永遠不會去檢查取消標誌位,甚至會造成永遠不能停止。
1.1 中斷
為了解決擁塞方法帶來的問題,就需要使用中斷機制來取消任務。
雖然在Java規範中,執行緒的取消和中斷沒有必然聯絡,但是在實踐中發現:中斷是取消執行緒的最合理的方式。
Thread類中和中斷相關的方法如下:
public class Thread {
// 中斷當前執行緒
public void interrupt();
// 判斷當前執行緒是否被中斷
public boolen isInterrupt();
// 清除當前執行緒的中斷狀態,並返回之前的值
public static boolen interrupt();
}
呼叫Interrupt方法並不是意味著要立刻停止目標執行緒,而只是傳遞請求中斷的訊息。所以對於中斷操作的正確理解為:正在執行的執行緒收到中斷請求之後,在下一個合適的時刻中斷自己。
使用中斷方法改進素數生成類如下:
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
//使用中斷的方式來取消任務
while (!Thread.currentThread().isInterrupted())
//put方法會隱式檢查並響應中斷
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* 允許任務退出 */
}
}
public void cancel() {
interrupt();
}
}
程式碼中有兩次檢查中斷請求:
- 第一次是在迴圈開始前,顯示檢查中斷請求;
- 第二次是在put方法,該方法為擁塞的,會隱式堅持當前執行緒是否被中斷;
1.2 中斷策略
和取消策略類似,可以被中斷的任務也需要有中斷策略:
即如何中斷,合適檢查中斷請求,以及接收到中斷請求之後如何處理。
由於每個執行緒擁有各自的中斷策略,因此除非清楚中斷對目標執行緒的含義,否者不要中斷該執行緒。
正是由於以上原因,大多數擁塞的庫函式在檢測到中斷都是丟擲中斷異常(InterruptedException)作為中斷響應,讓執行緒的所有者去處理,而不是去真的中斷當前執行緒。
雖然有人質疑Java沒有提供搶佔式的中斷機制,但是開發人員通過處理中斷異常的方法,可以定製更為靈活的中斷策略,從而在響應性和健壯性之間做出合理的平衡。
一般情況的中斷響應方法為:
- 傳遞異常:收到中斷異常之後,直接將該異常丟擲;
- 回覆中斷狀態:即再次呼叫Interrupt方法,恢復中斷狀態,讓呼叫堆疊的上層能看到中斷狀態進而處理它。
切記,只有實現了執行緒中斷策略的程式碼才能遮蔽中斷請求,在常規的任務和庫程式碼中都不應該遮蔽中斷請求。中斷請求是執行緒中斷和取消的基礎。
1.3 定時執行
定時執行一個任務是很常見的場景,很多問題是很費時間的,就需在規定時間內完成,如果沒有完成則取消任務。
以下程式碼就是一個定時執行任務的例項:
public class TimedRun1 {
private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);
public static void timedRun(Runnable r,
long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
cancelExec.schedule(new Runnable() {
public void run() {
// 中斷執行緒,
// 違規,不能在不知道中斷策略的前提下呼叫中斷,
// 該方法可能被任意執行緒呼叫。
taskThread.interrupt();
}
}, timeout, unit);
r.run();
}
}
很可惜,這是反面的例子,因為timedRun方法在不知道Runnable物件的中斷策略的情況下,就中斷該任務,這樣會承擔很大的風險。而且如果Runnable物件不支援中斷, 則該定時模型就會失效。
為了解決上述問題,就需要執行任務都執行緒有自己的中斷策略,如下:
public class LaunderThrowable {
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked", t);
}
}
public class TimedRun2 {
private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);
public static void timedRun(final Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t;
public void run() {
try {
r.run();
} catch (Throwable t) {
//中斷策略,儲存當前丟擲的異常,退出
this.t = t;
}
}
// 再次丟擲異常
void rethrow() {
if (t != null)
throw launderThrowable(t);
}
}
RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
//開啟任務子執行緒
taskThread.start();
//定時中斷任務子執行緒
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
//限時等待任務子執行緒執行完畢
taskThread.join(unit.toMillis(timeout));
//嘗試丟擲task在執行中丟擲到異常
task.rethrow();
}
}
無論Runnable物件是否支援中斷,RethrowableTask物件都會記錄下來發生的異常資訊並結束任務,並將該異常再次丟擲。
1.4 通過Future取消任務
Future用來管理任務的生命週期,自然也可以來取消任務,呼叫Future.cancel方法就是用中斷請求結束任務並退出,這也是Executor的預設中斷策略。
用Future實現定時任務的程式碼如下:
public class TimedRun {
private static final ExecutorService taskExec = Executors.newCachedThreadPool();
public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// 因超時而取消任務
} catch (ExecutionException e) {
// 任務異常,重新丟擲異常資訊
throw launderThrowable(e.getCause());
} finally {
// 如果該任務已經完成,將沒有影響
// 如果任務正在執行,將因為中斷而被取消
task.cancel(true); // interrupt if running
}
}
}
1.5 不可中斷的擁塞
一些的方法的擁塞是不能響應中斷請求的,這類操作以I/O操作居多,但是可以讓其丟擲類似的異常,來停止任務:
- Socket I/O: 關閉底層socket,所有因執行讀寫操作而擁塞的執行緒會丟擲SocketException;
- 同步 I/O:大部分Channel都實現了InterruptiableChannel介面,可以響應中斷請求,丟擲異常ClosedByInterruptException;
- Selector的非同步 I/O:Selector執行select方法之後,再執行close和wakeUp方法就會丟擲異常ClosedSelectorException。
以套接字為例,其利用關閉socket物件來響應異常的例項如下:
public class ReaderThread extends Thread {
private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
public void interrupt() {
try {
// 關閉套接字
// 此時in.read會丟擲異常
socket.close();
} catch (IOException ignored) {
} finally {
// 正常的中斷
super.interrupt();
}
}
public void run() {
try {
byte[] buf = new byte[BUFSZ];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuffer(buf, count);
}
} catch (IOException e) {
// 如果socket關閉,in.read方法將會丟擲異常
// 藉此機會,響應中斷,執行緒退出
}
}
public void processBuffer(byte[] buf, int count) {
}
}
2. 停止基於執行緒的服務
一個應用程式是由多個服務構成的,而每個服務會擁有多個執行緒為其工作。當應用程式關閉服務時,由服務來關閉其所擁有的執行緒。服務為了便於管理自己所擁有的執行緒,應該提供生命週期方來關閉這些執行緒。對於ExecutorService,其包含執行緒池,是其下屬執行緒的擁有者,所提供的生命週期方法就是shutdown和shutdownNow方法。
如果服務的生命週期大於所建立執行緒的生命週期,服務就應該提供生命週期方法來管理執行緒。
2.1 強行關閉和平緩關閉
我們以日誌服務為例,來說明兩種關閉方式的不同。首先,如下程式碼是不支援關閉的日誌服務,其採用多生產者-單消費者模式,生產者將日誌訊息放入擁塞佇列中,消費者從佇列中取出日誌打印出來。
public class LogWriter {
// 擁塞佇列作為快取區
private final BlockingQueue<String> queue;
// 日誌執行緒
private final LoggerThread logger;
// 佇列大小
private static final int CAPACITY = 1000;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
private class LoggerThread extends Thread {
//執行緒安全的位元組流
private final PrintWriter writer;
public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true); // autoflush
}
public void run() {
try {
while (true)
writer.println(queue.take());
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}
如果沒有終止操作,以上任務將無法停止,從而使得JVM也無法正常退出。但是,讓以上的日誌服務停下來其實並非難事,因為擁塞佇列的take方法支援響應中斷,這樣直接關閉服務的方法就是強行關閉,強行關閉的方式不會去處理已經提交但還未開始執行的任務。
但是,關閉日誌服務前,擁塞佇列中可能還有沒有及時打印出來的日誌訊息,所以強行關閉日誌服務並不合適,需要等佇列中已經存在的訊息都列印完畢之後再停止,這就是平緩關閉,也就是在關閉服務時會等待已提交任務全部執行完畢之後再退出。
除此之外,在取消生產者-消費者操作時,還需要同時告知消費者和生產者相關操作已經被取消。
平緩關閉的日誌服務如下,其採用了類似訊號量的方式記錄佇列中尚未處理的訊息數量。
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this") private boolean isShutdown;
// 訊號量 用來記錄佇列中訊息的個數
@GuardedBy("this") private int reservations;
public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start() {
loggerThread.start();
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
//同步方法判斷是否關閉和修改資訊量
if (isShutdown) // 如果已關閉,則不再允許生產者將訊息新增到佇列,會丟擲異常
throw new IllegalStateException(/*...*/);
//如果在工作狀態,訊號量增加
++reservations;
}
// 訊息入佇列;
queue.put(msg);
}
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
//同步方法讀取關閉狀態和資訊量
synchronized (LogService.this) {
//如果程序被關閉且佇列中已經沒有訊息了,則消費者退出
if (isShutdown && reservations == 0)
break;
}
// 取出訊息
String msg = queue.take();
// 消費訊息前,修改訊號量
synchronized (LogService.this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) { /* retry */
}
}
} finally {
writer.close();
}
}
}
}
2.2 關閉ExecutorService
在ExecutorService中,其提供了shutdown和shutdownNow方法來分別實現平緩關閉和強制關閉:
- shutdownNow:強制關閉,響應速度快,但是會有風險,因為有任務肯執行到一半被終止;
- shutdown:平緩關閉,響應速度較慢,會等到全部已提交的任務執行完畢之後再退出,更為安全。
這裡還需要說明下shutdownNow方法的侷限性,因為強行關閉直接關閉執行緒,所以無法通過常規的方法獲得哪些任務還沒有被執行。這就會導致我們無紡知道執行緒的工作狀態,就需要服務自身去記錄任務狀態。如下為示例程式碼:
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
//被取消任務的佇列
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}
public void shutdown() {
exec.shutdown();
}
public List<Runnable> shutdownNow() {
return exec.shutdownNow();
}
public boolean isShutdown() {
return exec.isShutdown();
}
public boolean isTerminated() {
return exec.isTerminated();
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return exec.awaitTermination(timeout, unit);
}
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated())
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
// 如果當前任務被中斷且執行器被關閉,則將該任務加入到容器中
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
}
3. 處理非正常執行緒終止
導致執行緒非正常終止的主要原因就是RuntimeException,其表示為不可修復的錯誤。一旦子執行緒丟擲異常,該異常並不會被父執行緒捕獲,而是會直接丟擲到控制檯。所以要認真處理執行緒中的異常,儘量設計完備的try-catch-finally程式碼塊。
當然,異常總是會發生的,為了處理能主動解決未檢測異常問題,Thread.API提供了介面UncaughtExceptionHandler。
public interface UncaughtExceptionHandler {
void uncaughtException(Thread t, Throwable e);
}
如果JVM發現一個執行緒因未捕獲異常而退出,就會把該異常交個Thread物件設定的UncaughtExceptionHandler來處理,如果Thread物件沒有設定任何異常處理器,那麼預設的行為就是上面提到的丟擲到控制檯,在System.err中輸出。
Thread物件通過setUncaughtExceptionHandler方法來設定UncaughtExceptionHandler,比如這樣:
public class WitchCaughtThread
{
public static void main(String args[])
{
Thread thread = new Thread(new Task());
thread.setUncaughtExceptionHandler(new ExceptionHandler());
thread.start();
}
}
class ExceptionHandler implements UncaughtExceptionHandler
{
@Override
public void uncaughtException(Thread t, Throwable e)
{
System.out.println("==Exception: "+e.getMessage());
}
}
同樣可以為所有的Thread設定一個預設的UncaughtExceptionHandler,通過呼叫Thread.setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)方法,這是Thread的一個static方法。
下面是一個例子,即發生為捕獲異常時將異常寫入日誌:
public class UEHLogger implements Thread.UncaughtExceptionHandler {
// 將未知的錯誤計入到日誌中
public void uncaughtException(Thread t, Throwable e) {
Logger logger = Logger.getAnonymousLogger();
logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
}
}
在Executor框架中,需要將異常的捕獲封裝到Runnable或者Callable中並通過execute提交的任務,才能將它丟擲的異常交給UncaughtExceptionHandler,而通過submit提交的任務,無論是丟擲的未檢測異常還是已檢查異常,都將被認為是任務返回狀態的一部分。如果一個由submit提交的任務由於丟擲了異常而結束,那麼這個異常將被Future.get封裝在ExecutionException中重新丟擲。
public class ExecuteCaught
{
public static void main(String[] args)
{
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ThreadPoolTask());
exec.shutdown();
}
}
class ThreadPoolTask implements Runnable
{
@Override
public void run()
{
Thread.currentThread().setUncaughtExceptionHandler(new ExceptionHandler());
System.out.println(3/2);
System.out.println(3/0);
System.out.println(3/1);
}
}