1. 程式人生 > >《JAVA併發程式設計實戰》取消和關閉

《JAVA併發程式設計實戰》取消和關閉

文章目錄

引言

java沒有提供任何機制來安全的終止執行緒。但它提供了一種協作機制:中斷。

這種協作式的方法是必要的,我們很少希望某個任務、執行緒或服務立即停止,因為這種立即停止會導致資料結構處於不一致的狀態。

任務取消

取消某個操作的原因可能是:

  • 使用者請求取消:點選取消按鈕等操作
  • 有時間限制的操作
  • 應用程式事件:例如應用程式對某個問題空間進行分解並搜尋,從而使不同的任務可以搜尋不同區域,當其中一個任務找到了解決方案時,所有其他仍在搜尋的任務都要被取消。
  • 錯誤:爬蟲下載資源時,磁碟滿,可能所有任務都將被取消
  • 關閉:當一個程式或服務關閉時,必須對正在處理和等待處理的工作執行某種操作。

在Java中沒有一種安全的搶佔方法停止執行緒,因此也就沒有安全的搶佔方法來停止任務。

其中一種協作機制能設定某個“已請求取消”標識,而任務將定期檢視該標誌。如果設定了這個標誌,那麼任務將提前結束。

public class PrimeGenerator implements Runnable {
    private final List<BigInteger> primes = new ArrayList<>();
    private volatile boolean cancelled;
    
    public void run()
{ BigInteger p = BigInteger.ONE; while(!cancelled) { p = p.nextProbablePrime(); synchronized (this){ primes.add(p); } } } public void cancel() { cancelled = true; } public synchronized List<BigInteger> get(){ return new ArrayList<BigInteger>(primes); } } List<BigInteger> aSecondOfPrimes() throws InterruptedException{ PrimeGenerator generator = new PrimeGenerator(); new Thread(generator).start(); try { SECONDS.sleep(1); } finally { generator.cancel(); } return generator.get(); }

一個可取消的任務必須擁有取消策略,在策略中詳細的定義取消操作的"How",“When"以及"What”.即其他程式碼如何取消該任務,任務在何時(When)檢查是否已經請求了取消,以及在響應取消請求時應該執行哪些操作。

中斷

PrimeGenerator中取消的機制最終會使得搜尋素數的任務退出,但這退出過程中需要花費一定的時間。然而,如果使用這個方法的任務呼叫了一個阻塞方法,例如BlockingQueue.put,那麼可能會產生一個更嚴重的問題——任務可能永遠不會檢查取消標識,因此永遠不會結束。

class BrokenPrimeProducer extends Thread{
    private final BlockingQueue<BigInteger> queue;
    private volatile boolean cancelled = false;
    
    BrokenPrimeProducer(BlockingQueue<BigInteger> queue){
        this.queue = queue;
    }
    
    public void run(){
        try{
            BigInteger p = BigInteger.ONE;
            while(!cancelled){
                queue.put(p = p.nextProbablePrime());
            }
        } catch(InterruptedException consumed){
            
        }
    }
    
    public void cancel() {
        cancelled = true;
    }
}

void consumePrimes() throws InterruptedException{
    BlockingQueue<BigInteger> primes = ...;
    BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
    producer.start();
    try{
        while(needMorePrimes()){
            consume(primes.take());
        } 
    } finally {
        producer.cancel();
    }
}

生產者執行緒生產素數,並將它們放入到一個阻塞佇列。如果生產者的速度超過了消費者的處理速度,佇列將被填滿,put方法也會阻塞。當生產者在put中阻塞時,如果消費者希望取消生產者任務,那麼在它呼叫cancel方法設定cancelled標誌時,生產者卻永遠不能檢查這個標誌,因為它無法從阻塞的put方法中恢復過來。

每個執行緒都有一個boolean型別的中斷狀態。當中斷執行緒時,將這個中斷狀態設定為true。在Thread中包含了中斷執行緒以及查詢執行緒中斷狀態的方法。

public class Thread{
    public void interrupt(){}
    public void boolean isInterrupted(){}
    public static boolean interrupted(){}
}

interrupt中斷目標執行緒,isInterrupted方法返回目標執行緒的中斷狀態,靜態的interrupted方法清除當前執行緒的中斷狀態。

阻塞庫方法,例如Thread.sleep和Object.wait等,都會檢查執行緒何時中斷,且在發現中斷時提前返回,他們在響應中斷時執行的操作包括:清除中斷狀態,丟擲InterruptedException,表示阻塞操作由於中斷而提前結束。JVM並不能保證阻塞方法檢測到中斷的速度,但在實際情況中響應速度還是非常快的。

當執行緒中非阻塞狀態下中斷時,它的中斷狀態將被設定,然後根據被取消的操作來檢查中斷狀態以判斷髮生了中斷。通過這樣的方法,中斷操作將變得有粘性——如果不觸發InterruptedException,那麼中斷狀態將一直保持,直到明確的清除中斷狀態。

呼叫interrupt並不意味著立即停止目標執行緒正在進行的工作,而只是傳遞了請求中斷的訊息。

對中斷操作的正確理解是:它並不會真正的中斷一個正在執行的執行緒,而只是發出中斷請求,然後由執行緒在下一個合適的時刻中斷自己。

在使用靜態的interrupted時應該小心,因為它會清除當前執行緒的中斷狀態。如果在呼叫interrupted時返回了true,那麼除非要遮蔽這個中斷,否則必須對它進行處理——可以丟擲InterruptedException,也可以再次呼叫interrupt來恢復中斷狀態

通常,中斷是實現取消的最合理方式

class PrimeProducer extends Thread{
    private final BlockingQueue<BigInteger> queue;
    PrimeProducer(BlockingQueue<BigInteger> queue){
        this.queue = queue;
    }
    
    public void run(){
        try{
            BigInteger p = BigInteger.ONE;
            while(!Thread.currentThread().isInterrupted()){
                queue.put(p = p.nextProbablePrime());
            } 
        } catch(InterruptedException consumed) {
            /* 允許執行緒退出 */
        }
    }
    
    public void cancel(){
        interrupt();
    }
}

中斷策略

中斷策略規定執行緒如何解釋某個中斷請求——當發現中斷請求時,應該做哪些工作,哪些工作單元對於中斷來說是原子操作,以及以多快的速度來響應中斷。

最合理的中斷策略是以某種形式的執行緒級取消操作或者服務級取消操作:儘快退出,必要時進行清理,通知某個所有者該執行緒已經退出。此外還可以建立其他的中斷策略,例如暫停服務或重新開始服務。

區分任務和執行緒對中斷的反應非常重要。一箇中斷請求可以有一個或者多個接受者——中斷執行緒池中的某個工作者執行緒,同時意味著“取消當前任務”和“關閉工作者執行緒”。

任務不會在其自己擁有的執行緒中執行,而是在某個服務擁有的執行緒中執行。對於非執行緒所有者的程式碼來說(例如,對於執行緒池來說,如何線上程池實現以外的程式碼),應該小心的儲存中斷狀態,這樣擁有執行緒的程式碼才能對中斷做出相應。

執行緒應該只能由其所有者中斷,所有者可以將執行緒的中斷策略資訊封裝到某個合適的取消機制中,例如關閉方法。

由於每個執行緒擁有各自的中斷策略,因此除非你知道中斷對該執行緒的含義,否則就不應該中斷這個執行緒

響應中斷

在呼叫可中斷的阻塞函式時,例如Thread.sleep或BolckingQueue.put等,有兩種實用策略可以處理InterruptedException:

  • 傳遞異常
  • 恢復中斷狀態

將InterruptedException傳遞給呼叫者:

BlockingQueue<Task> queue;
public Task getNextTask() throws InterruptedException{
    return queue.take();
}

如果不想或者無法傳遞InterruptedException(或許通過Runnable來定義任務),那麼需要尋找另一種方式來儲存中斷請求。一種標準的方法就是通過再次呼叫interrupt來恢復中斷狀態。

只有實現了執行緒中斷策略的程式碼才可以遮蔽中斷請求,在常規的任務和庫程式碼中都不應該遮蔽中斷請求。

對於不支援取消但仍可以呼叫可中斷阻塞方法的操作,他們必須在迴圈中呼叫這些方法,並在發現中斷後重新嘗試。在這種情況下,他們應該在本地儲存中斷狀態,並在返回前回復狀態而不是在捕獲InterruptedException時恢復狀態。如果過早的設定中斷狀態,就可能引起無限迴圈,因為大多數可中斷的阻塞方法都會在入口處檢查中斷狀態,並且當發現該狀態已被設定時會立即丟擲InterruptedException(通常,可中斷的方法會在阻塞或進行重要的工作前首先檢查中斷,從而儘快的響應中斷)

不可取消的任務在退出前恢復中斷

public Task getNextTask(BlockingQueue<Task> queue) {
    boolean interrupted = false;
    try {
        while(true){
            try{
                return queue.take();    
            }catch(InterruptedException e) {
                interrupted = true;
            }
        } 
    } finally{
        if(interrupted){
            Thread.currentThread().interrupt();
        }
    }
}

如果程式碼不會呼叫可中斷的阻塞方法,那麼仍然可以通過在任務程式碼中輪詢當前執行緒的中斷狀態來響應中斷。

示例:計時執行

private static final ScheduledExecutorService cancelExec = ...;
public static void timedRun(Runnable r,long timeout,TimeUnit unit) {
    final Thread taskThread = Thread.currentThread();
    cancelExec.schedule(new Runnable(){
        public void run(){
            taskThread.interrupt();
        }
    },timeout,unit);
    r.run();
}

這是一種非常簡單的方法,然而卻破壞了以下規則:在中斷執行緒之前,應該瞭解它的中斷策略。由於timedRun可以從任意一個執行緒中呼叫,因此它無法知道這個呼叫執行緒的中斷策略。如果任務在超時之前完成,那麼中斷timedRun所線上程的取消任務將在timedRun返回到呼叫者後啟動。

而且,如果任務不響應中斷,那麼timedRun會在任務結束時才返回,此時可能已經超過了指定的時限。

public static void timedRun(final Runnable r,long timeout,TimeUnit unit) throws InterruptedException{
    class RethrowableTask implements Runnable{
        private volatile Throwable t;
        public void run(){
            try{
                r.run();
            } catch(Throwable t){
                this.t = t;
            }
        }
        void rethrow(){
            if(t != null) {
                throw launderThrowable(t);
            }
        }    
    }
    
    RethrowableTask task = new RethrowableTask();
    final Thread taskThread = new Thread(task);
    taskThread.start();
    cancelExec.schedule(new Runnable(){
        public void run() {
            taskThread.interrupt();
        }
    },timeout,unit);
    taskThread.join(unit.toMillis(timeout));
    task.rethrow();
    
    
}

這個示例的程式碼解決了前面示例中的問題,但由於它依賴於一個限時的join,因此存在著join的不足:無法知道執行控制是因為執行緒正常退出而返回還是因為join超時而返回。

通過Future來實現取消

ExecutorService.submit返回一個Future來描述任務。Future有一個cancel方法,該方法帶有一個boolean型別的引數mayInterruptIfRunning,表示取消操作是否成功(這只是表示任務是否能夠接收中斷,而不是表示任務能否檢測並處理中斷)。如果該引數為true並且任務當前正在某個執行緒中執行,那麼這個執行緒能被中斷。如果這個引數為false,那麼意味著“若任務還沒有啟動,那就不要啟動它”,這種方式應該用於那些不處理中斷的任務中。

除非你清楚執行緒的中斷策略,否則不要中斷執行緒。當嘗試取消某個任務時,不宜直接中斷執行緒池。

public static void timeRun(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{
    Future<?> task = taskExec.submit(r);
    try{
        task.get(timeout,unit);
    } catch(TimeoutException e){
        //接下來任務將被取消
    } catch(ExecutionException e){
        throw launderThrowable(e.getCauese());
    } finally {
        //如果任務已經結束,那麼執行取消操作也不會帶來任何影響
        //如果任務正在執行,那麼將被打斷
        task.cancel(true);
    }
}

當Future.get丟擲InterruptedException或TimeoutException 時,如果你知道不再需要結果,那麼就可以呼叫Future.cancel取消任務

處理不可中斷的阻塞

並非所有的可阻塞方法或者阻塞機制都能響應中斷;如果一個執行緒由於執行同步的Socket I/O或者等待獲得內建鎖而阻塞,那麼中斷請求只能設定執行緒的中斷狀態,除此之外沒有其他任何作用。

由於執行不可中斷操作而被阻塞的執行緒,可以使用類似於中斷的手段來停止這些執行緒,但這要求我們必須知道執行緒阻塞的原因。

  1. java.io包中的同步Socket I/O。在伺服器應用程式中,最常見的阻塞I/O形式就是對套接字進行讀取和寫入。雖然InputStream和OutputStream中的read和write等方法都不會響應中斷,但是通過關閉底層的套接字,可以使得由於執行read或write等方法被阻塞的執行緒丟擲一個SocketException
  2. java.io包中的同步I/O。當中斷一個正在InterruptibleChannel上等待的執行緒時,將丟擲ClosedByInterruptException並關閉鏈路。當關閉一個InterruptibleChannel時,將導致所有在鏈路操作上阻塞的執行緒都丟擲AsynchronousCloseException。大多數的Channel都實現了InterruptibleChannel.
  3. Selector的非同步I/O。如果一個執行緒在呼叫Selector.select方法時阻塞了,那麼呼叫close或wakeup方法會使執行緒丟擲ClosedSelectorException並提前返回。
  4. 獲取某個鎖。如果一個執行緒由於等待某個內建鎖而阻塞,那麼將無法響應中斷。在Lock類中提供了lockInterruptibly方法,該方法允許在等待一個鎖的同時仍能響應中斷。

下面展示的是如何封裝非標準的取消操作。

public class ReaderThread extends Thread{
    private final Socket socket;
    private final InputStream in;
    public ReaderThread(Socket socket) throws IOException{
        this.socket = socket;
        this.in = socket.getInputStream();
    }
    
    public void interrupt(){
        try{
            socket.close();
        }catch(IOException ignored){
            
        }finally{
            super.interrupt();
        }
        
    }
    
    
    public void run(){
        try{
            byte[] buf = new byte[1024];
            while(true){
                int count = in.read(buf);
                if(count < 0) {
                    break;
                } else if(count > 0) {
                    processBuffer(buf,count);
                }
            }
        } catch(IOException e){
            /*允許執行緒退出*/
        }
    }
}

採用newTaskFor封裝非標準的取消

當把一個Callable提交給ExecutorService時,submit方法會返回一個Future,我們可以通過這個Future來取消任務。newTaskFor是一個工廠方法,它將建立Future來代表任務。newTaskFor還能返回一個RunnableFuture介面,該介面拓展了Future和Runnable(並由FutureTask實現)。

通過定製表示任務的Future可以改變Future.cancel的行為。例如定製的取消程式碼可以實現日誌記錄或者收集取消操作的統計資訊,以及取消一些不響應中斷的操作。通過改寫interrupt方法,ReaderThread可以取消基於套接字的執行緒。同樣,通過改寫任務的Future,cancel方法也可以實現類似的功能。

public interface CancellableTask<T> extends Callable<T>{
    void cancel();
    RunnableFuture<T> newTask();
}

public class CancellingExecutor extends ThreadPoolExecutor{
    protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if(callable instanceof CancellableTask){
            return ((CancellableTask<T>)callable).newTask();
        } else {
            return super.newTaskFor(callable);
        }
    }
}

public abstract class SocketUsingTask<T> implements CancellableTask<T> {
    private Socket socket;
    protected synchronized void setSocket(Socket s){
        socket = s;
    }
    
    public synchronized void cancel(){
        try{
            if(socket!=null){
                socket.close();
            } 
        } catch(IOException ingnored){}
    }
    
    public RunnableFuture<T> newTask(){
        return new FutureTask<T>(this){
            public boolean cancel(boolean mayInterruptIfRunning){
                try{
                    SocketUsingTask.this.cancel();
                } finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

停止基於執行緒的服務

應用程式通常會建立擁有多個執行緒的服務,例如執行緒池,並且這些服務的生命週期通常比建立它們的生命週期更長。如果應用程式準備退出,那麼這些服務所擁有的執行緒也需要結束。由於無法通過搶佔式的方法來停止執行緒,因此它們需要自行結束。

正確的封裝原則是:除非擁有某個執行緒,否則不能對該執行緒進行操控,例如,中斷執行緒或者修改執行緒優先順序等。

對於持有執行緒的服務,主要服務的存在時間大於建立執行緒的方法的存在時間,那麼就應該提供生命週期方法。

示例:日誌服務

public class LogWriter{
    private final BlockingQueue<String> queue;
    private final LoggerThread thread;
    
    public LogWriter(Writer writer){
        this.queue = new BlokingQueue<>();
        this.logger = new LoggerThread(write);
    }
    
    public void start(){
        logget.start();
    }
    
    public void log(String msg) throws InterruptedException{
        queue.put(msg);
    }
    
    private class LoggerThread extends Thread{
        private final PrintWriter writer;;
        
        public void run(){
            try{
                while(true){
                    writer.println(queue.take());
                }
            } catch(InterruptedException ignored){
                
            } finally {
                writer.close();
            }
            
        }
    }
}

要停止日誌執行緒是很容易的,因為它會反覆呼叫take,而take能響應中斷。如果日誌執行緒修改為捕獲到InterruptedException時退出,那麼只需要中斷日誌執行緒就能停止服務。

然而,如果只是使日誌執行緒退出,那麼還不是一種完備的關閉機制。這種直接關閉的做法會丟失那些正在等待被寫入到日誌的資訊,不僅如此,其他執行緒將在呼叫log時被阻塞,因為日誌訊息佇列是滿的,因此這些執行緒將無法解除阻塞狀態。

另一種關閉LogWriter的方法是:設定某個“已請求關閉”標誌,避免進一步提交日誌訊息。

public void log(String msg) throws InterruptedException{
    if(!shutdownRequested){
        queue.put(msg);
    }else {
        throw new IllegalStateException("logger is shut down");
    }