1. 程式人生 > >關閉執行緒的正確方法

關閉執行緒的正確方法

執行緒在啟動之後,正常的情況下會執行到任務完成,但是有的情況下會需要提前結束任務,如使用者取消操作等。可是,讓執行緒安全、快速和可靠地停止並不是件容易的事情,因為Java中沒有提供安全的機制來終止執行緒。雖然有Thread.stop/suspend等方法,但是這些方法存在缺陷,不能保證執行緒中共享資料的一致性,所以應該避免直接呼叫。

執行緒在終止的過程中,應該先進行操作來清除當前的任務,保持共享資料的一致性,然後再停止。

慶幸的是,Java中提供了中斷機制,來讓多執行緒之間相互協作,由一個程序來安全地終止另一個程序。

1. 任務的取消

如果外部的程式碼能在某個操作正常完成之前將其設定為完成狀態,則該操作為可取消的

Cancellable)。

操作被取消的原因有很多,比如超時,異常,請求被取消等等。

一個可取消的任務要求必須設定取消策略,即如何取消,何時檢查取消命令,以及接收到取消命令之後如何處理。

最簡單的取消辦法就是利用取消標誌位,如下所示:

public class PrimeGenerator implements Runnable {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    private final List<BigInteger> primes
            = new
ArrayList<BigInteger>(); //取消標誌位 private volatile boolean cancelled; public void run() { BigInteger p = BigInteger.ONE; //每次在生成下一個素數時堅持是否取消 //如果取消,則退出 while (!cancelled) { p = p.nextProbablePrime(); synchronized (this) { primes.add(p); } } } public
void cancel()
{ cancelled = true; } public synchronized List<BigInteger> get() { return new ArrayList<BigInteger>(primes); } static List<BigInteger> aSecondOfPrimes() throws InterruptedException { PrimeGenerator generator = new PrimeGenerator(); exec.execute(generator); try { SECONDS.sleep(1); } finally { generator.cancel(); } return generator.get(); } }

這段程式碼用於生成素數,並在任務執行一秒鐘之後終止。其取消策略為:通過改變取消標誌位取消任務,任務在每次生成下一隨機素數之前檢查任務是否被取消,被取消後任務將退出。

然而,該機制的最大的問題就是無法應用於擁塞方法。假設在迴圈中呼叫了擁塞方法,任務可能因擁塞而永遠不會去檢查取消標誌位,甚至會造成永遠不能停止。

1.1 中斷

為了解決擁塞方法帶來的問題,就需要使用中斷機制來取消任務。

雖然在Java規範中,執行緒的取消和中斷沒有必然聯絡,但是在實踐中發現:中斷是取消執行緒的最合理的方式

Thread類中和中斷相關的方法如下:

public class Thread {
    // 中斷當前執行緒
    public void interrupt();
    // 判斷當前執行緒是否被中斷
    public boolen isInterrupt();
    // 清除當前執行緒的中斷狀態,並返回之前的值
    public static boolen interrupt();   
}

呼叫Interrupt方法並不是意味著要立刻停止目標執行緒,而只是傳遞請求中斷的訊息。所以對於中斷操作的正確理解為:正在執行的執行緒收到中斷請求之後,在下一個合適的時刻中斷自己。

使用中斷方法改進素數生成類如下:

public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;
    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            //使用中斷的方式來取消任務
            while (!Thread.currentThread().isInterrupted())
                //put方法會隱式檢查並響應中斷
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* 允許任務退出 */
        }
    }

    public void cancel() {
        interrupt();
    }
}

程式碼中有兩次檢查中斷請求:

  • 第一次是在迴圈開始前,顯示檢查中斷請求;
  • 第二次是在put方法,該方法為擁塞的,會隱式堅持當前執行緒是否被中斷;

1.2 中斷策略

和取消策略類似,可以被中斷的任務也需要有中斷策略:
即如何中斷,合適檢查中斷請求,以及接收到中斷請求之後如何處理。

由於每個執行緒擁有各自的中斷策略,因此除非清楚中斷對目標執行緒的含義,否者不要中斷該執行緒。

正是由於以上原因,大多數擁塞的庫函式在檢測到中斷都是丟擲中斷異常(InterruptedException)作為中斷響應,讓執行緒的所有者去處理,而不是去真的中斷當前執行緒。

雖然有人質疑Java沒有提供搶佔式的中斷機制,但是開發人員通過處理中斷異常的方法,可以定製更為靈活的中斷策略,從而在響應性和健壯性之間做出合理的平衡。

一般情況的中斷響應方法為:

  1. 傳遞異常:收到中斷異常之後,直接將該異常丟擲;
  2. 回覆中斷狀態:即再次呼叫Interrupt方法,恢復中斷狀態,讓呼叫堆疊的上層能看到中斷狀態進而處理它。

切記,只有實現了執行緒中斷策略的程式碼才能遮蔽中斷請求,在常規的任務和庫程式碼中都不應該遮蔽中斷請求。中斷請求是執行緒中斷和取消的基礎。

1.3 定時執行

定時執行一個任務是很常見的場景,很多問題是很費時間的,就需在規定時間內完成,如果沒有完成則取消任務。

以下程式碼就是一個定時執行任務的例項:

public class TimedRun1 {
    private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);

    public static void timedRun(Runnable r,
                                long timeout, TimeUnit unit) {
        final Thread taskThread = Thread.currentThread();
        cancelExec.schedule(new Runnable() {
            public void run() {
                // 中斷執行緒,
                // 違規,不能在不知道中斷策略的前提下呼叫中斷,
                // 該方法可能被任意執行緒呼叫。
                taskThread.interrupt();
            }
        }, timeout, unit);
        r.run();
    }
}

很可惜,這是反面的例子,因為timedRun方法在不知道Runnable物件的中斷策略的情況下,就中斷該任務,這樣會承擔很大的風險。而且如果Runnable物件不支援中斷, 則該定時模型就會失效。

為了解決上述問題,就需要執行任務都執行緒有自己的中斷策略,如下:

public class LaunderThrowable {
    public static RuntimeException launderThrowable(Throwable t) {
        if (t instanceof RuntimeException)
            return (RuntimeException) t;
        else if (t instanceof Error)
            throw (Error) t;
        else
            throw new IllegalStateException("Not unchecked", t);
    }
}

public class TimedRun2 {
    private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);

    public static void timedRun(final Runnable r,
                                long timeout, TimeUnit unit)
            throws InterruptedException {
        class RethrowableTask implements Runnable {
            private volatile Throwable t;

            public void run() {
                try {
                    r.run();
                } catch (Throwable t) {
                    //中斷策略,儲存當前丟擲的異常,退出
                    this.t = t;
                }
            }

            // 再次丟擲異常
            void rethrow() {
                if (t != null)
                    throw launderThrowable(t);
            }
        }

        RethrowableTask task = new RethrowableTask();
        final Thread taskThread = new Thread(task);
        //開啟任務子執行緒
        taskThread.start();
        //定時中斷任務子執行緒
        cancelExec.schedule(new Runnable() {
            public void run() {
                taskThread.interrupt();
            }
        }, timeout, unit);

        //限時等待任務子執行緒執行完畢
        taskThread.join(unit.toMillis(timeout));
        //嘗試丟擲task在執行中丟擲到異常
        task.rethrow();
    }
}

無論Runnable物件是否支援中斷,RethrowableTask物件都會記錄下來發生的異常資訊並結束任務,並將該異常再次丟擲。

1.4 通過Future取消任務

Future用來管理任務的生命週期,自然也可以來取消任務,呼叫Future.cancel方法就是用中斷請求結束任務並退出,這也是Executor的預設中斷策略。

用Future實現定時任務的程式碼如下:

public class TimedRun {
    private static final ExecutorService taskExec = Executors.newCachedThreadPool();

    public static void timedRun(Runnable r,
                                long timeout, TimeUnit unit)
            throws InterruptedException {
        Future<?> task = taskExec.submit(r);
        try {
            task.get(timeout, unit);
        } catch (TimeoutException e) {
            // 因超時而取消任務
        } catch (ExecutionException e) {
            // 任務異常,重新丟擲異常資訊
            throw launderThrowable(e.getCause());
        } finally {
            // 如果該任務已經完成,將沒有影響
            // 如果任務正在執行,將因為中斷而被取消
            task.cancel(true); // interrupt if running
        }
    }
}

1.5 不可中斷的擁塞

一些的方法的擁塞是不能響應中斷請求的,這類操作以I/O操作居多,但是可以讓其丟擲類似的異常,來停止任務:

  • Socket I/O: 關閉底層socket,所有因執行讀寫操作而擁塞的執行緒會丟擲SocketException
  • 同步 I/O:大部分Channel都實現了InterruptiableChannel介面,可以響應中斷請求,丟擲異常ClosedByInterruptException;
  • Selector的非同步 I/O:Selector執行select方法之後,再執行closewakeUp方法就會丟擲異常ClosedSelectorException

以套接字為例,其利用關閉socket物件來響應異常的例項如下:

public class ReaderThread extends Thread {
    private static final int BUFSZ = 512;
    private final Socket socket;
    private final InputStream in;

    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.in = socket.getInputStream();
    }

    public void interrupt() {
        try {
            // 關閉套接字
            // 此時in.read會丟擲異常
            socket.close();
        } catch (IOException ignored) {
        } finally {
            // 正常的中斷
            super.interrupt();
        }
    }

    public void run() {
        try {
            byte[] buf = new byte[BUFSZ];
            while (true) {
                int count = in.read(buf);
                if (count < 0)
                    break;
                else if (count > 0)
                    processBuffer(buf, count);
            }
        } catch (IOException e) { 
            // 如果socket關閉,in.read方法將會丟擲異常
            // 藉此機會,響應中斷,執行緒退出
        }
    }

    public void processBuffer(byte[] buf, int count) {
    }
}

2. 停止基於執行緒的服務

一個應用程式是由多個服務構成的,而每個服務會擁有多個執行緒為其工作。當應用程式關閉服務時,由服務來關閉其所擁有的執行緒。服務為了便於管理自己所擁有的執行緒,應該提供生命週期方來關閉這些執行緒。對於ExecutorService,其包含執行緒池,是其下屬執行緒的擁有者,所提供的生命週期方法就是shutdownshutdownNow方法。

如果服務的生命週期大於所建立執行緒的生命週期,服務就應該提供生命週期方法來管理執行緒。

2.1 強行關閉和平緩關閉

我們以日誌服務為例,來說明兩種關閉方式的不同。首先,如下程式碼是不支援關閉的日誌服務,其採用多生產者-單消費者模式,生產者將日誌訊息放入擁塞佇列中,消費者從佇列中取出日誌打印出來。

public class LogWriter {
    // 擁塞佇列作為快取區
    private final BlockingQueue<String> queue;
    // 日誌執行緒
    private final LoggerThread logger;
    // 佇列大小
    private static final int CAPACITY = 1000;

    public LogWriter(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }

    public void start() {
        logger.start();
    }

    public void log(String msg) throws InterruptedException {
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        //執行緒安全的位元組流
        private final PrintWriter writer;

        public LoggerThread(Writer writer) {
            this.writer = new PrintWriter(writer, true); // autoflush
        }

        public void run() {
            try {
                while (true)
                    writer.println(queue.take());
            } catch (InterruptedException ignored) {
            } finally {
                writer.close();
            }
        }
    }
}

如果沒有終止操作,以上任務將無法停止,從而使得JVM也無法正常退出。但是,讓以上的日誌服務停下來其實並非難事,因為擁塞佇列的take方法支援響應中斷,這樣直接關閉服務的方法就是強行關閉,強行關閉的方式不會去處理已經提交但還未開始執行的任務。

但是,關閉日誌服務前,擁塞佇列中可能還有沒有及時打印出來的日誌訊息,所以強行關閉日誌服務並不合適,需要等佇列中已經存在的訊息都列印完畢之後再停止,這就是平緩關閉,也就是在關閉服務時會等待已提交任務全部執行完畢之後再退出。

除此之外,在取消生產者-消費者操作時,還需要同時告知消費者和生產者相關操作已經被取消。

平緩關閉的日誌服務如下,其採用了類似訊號量的方式記錄佇列中尚未處理的訊息數量。

public class LogService {
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThread;
    private final PrintWriter writer;
    @GuardedBy("this") private boolean isShutdown;
    // 訊號量 用來記錄佇列中訊息的個數
    @GuardedBy("this") private int reservations;

    public LogService(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start() {
        loggerThread.start();
    }

    public void stop() {
        synchronized (this) {
            isShutdown = true;
        }
        loggerThread.interrupt();
    }

    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            //同步方法判斷是否關閉和修改資訊量
            if (isShutdown) // 如果已關閉,則不再允許生產者將訊息新增到佇列,會丟擲異常
                throw new IllegalStateException(/*...*/);
            //如果在工作狀態,訊號量增加
            ++reservations;
        }
        // 訊息入佇列;
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    try {
                        //同步方法讀取關閉狀態和資訊量
                        synchronized (LogService.this) {
                            //如果程序被關閉且佇列中已經沒有訊息了,則消費者退出
                            if (isShutdown && reservations == 0)
                                break;
                        }
                        // 取出訊息
                        String msg = queue.take();
                        // 消費訊息前,修改訊號量
                        synchronized (LogService.this) {
                            --reservations;
                        }
                        writer.println(msg);
                    } catch (InterruptedException e) { /* retry */
                    }
                }
            } finally {
                writer.close();
            }
        }
    }
}

2.2 關閉ExecutorService

ExecutorService中,其提供了shutdownshutdownNow方法來分別實現平緩關閉和強制關閉:

  • shutdownNow:強制關閉,響應速度快,但是會有風險,因為有任務肯執行到一半被終止;
  • shutdown:平緩關閉,響應速度較慢,會等到全部已提交的任務執行完畢之後再退出,更為安全。

這裡還需要說明下shutdownNow方法的侷限性,因為強行關閉直接關閉執行緒,所以無法通過常規的方法獲得哪些任務還沒有被執行。這就會導致我們無紡知道執行緒的工作狀態,就需要服務自身去記錄任務狀態。如下為示例程式碼:

public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;

    //被取消任務的佇列
    private final Set<Runnable> tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }

    public void shutdown() {
        exec.shutdown();
    }

    public List<Runnable> shutdownNow() {
        return exec.shutdownNow();
    }

    public boolean isShutdown() {
        return exec.isShutdown();
    }

    public boolean isTerminated() {
        return exec.isTerminated();
    }

    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        return exec.awaitTermination(timeout, unit);
    }

    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated())
            throw new IllegalStateException(/*...*/);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                    // 如果當前任務被中斷且執行器被關閉,則將該任務加入到容器中
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
}

3. 處理非正常執行緒終止

導致執行緒非正常終止的主要原因就是RuntimeException,其表示為不可修復的錯誤。一旦子執行緒丟擲異常,該異常並不會被父執行緒捕獲,而是會直接丟擲到控制檯。所以要認真處理執行緒中的異常,儘量設計完備的try-catch-finally程式碼塊。

當然,異常總是會發生的,為了處理能主動解決未檢測異常問題,Thread.API提供了介面UncaughtExceptionHandler

public interface UncaughtExceptionHandler {
    void uncaughtException(Thread t, Throwable e);
}

如果JVM發現一個執行緒因未捕獲異常而退出,就會把該異常交個Thread物件設定的UncaughtExceptionHandler來處理,如果Thread物件沒有設定任何異常處理器,那麼預設的行為就是上面提到的丟擲到控制檯,在System.err中輸出。

Thread物件通過setUncaughtExceptionHandler方法來設定UncaughtExceptionHandler,比如這樣:

public class WitchCaughtThread  
{  
    public static void main(String args[])  
    {  
        Thread thread = new Thread(new Task());  
        thread.setUncaughtExceptionHandler(new ExceptionHandler());  
        thread.start();  
    }  
}  
  
class ExceptionHandler implements UncaughtExceptionHandler  
{  
    @Override  
    public void uncaughtException(Thread t, Throwable e)  
    {  
        System.out.println("==Exception: "+e.getMessage());  
    }  
}  

同樣可以為所有的Thread設定一個預設的UncaughtExceptionHandler,通過呼叫Thread.setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)方法,這是Thread的一個static方法。

下面是一個例子,即發生為捕獲異常時將異常寫入日誌:

public class UEHLogger implements Thread.UncaughtExceptionHandler {

    // 將未知的錯誤計入到日誌中
    public void uncaughtException(Thread t, Throwable e) {
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
    }
}

Executor框架中,需要將異常的捕獲封裝到Runnable或者Callable中並通過execute提交的任務,才能將它丟擲的異常交給UncaughtExceptionHandler,而通過submit提交的任務,無論是丟擲的未檢測異常還是已檢查異常,都將被認為是任務返回狀態的一部分。如果一個由submit提交的任務由於丟擲了異常而結束,那麼這個異常將被Future.get封裝在ExecutionException中重新丟擲。

public class ExecuteCaught  
{  
    public static void main(String[] args)  
    {  
        ExecutorService exec = Executors.newCachedThreadPool();  
        exec.execute(new ThreadPoolTask());  
        exec.shutdown();  
    }  
}  
  
class ThreadPoolTask implements Runnable  
{  
    @Override  
    public void run()  
    {  
        Thread.currentThread().setUncaughtExceptionHandler(new ExceptionHandler());  
        System.out.println(3/2);  
        System.out.println(3/0);  
        System.out.println(3/1);  
    }  
}