1. 程式人生 > >Java高併發程式設計:取消和關閉

Java高併發程式設計:取消和關閉

Java沒有提供任何機制來安全地終止執行緒,但是它提供了中斷(Interruption).這是一種協作機制,能夠使一個執行緒終止另一個執行緒當前的工作。

  • 在對一個執行緒物件呼叫Thread.interrupted()方法之後,一般情況下對這個執行緒不會產生任何影響。因為呼叫Thread.interrupted()方法只是將增執行緒的中斷標誌位設定為true。
  • 如果一個執行緒被呼叫Thread.interrupted()方法之後,如果它的狀態是阻塞狀態或者是等待狀態,而且這個狀態正是因為正在執行的wait、join、sleep執行緒造成的,那麼是會改變執行的結果(丟擲InterruptException異常)

1.執行緒終止

由於Java沒有提供任何機制來安全地終止執行緒,那麼我們應該如何終止執行緒呢?下面我們提供三種執行緒終止的方法:

  1. 使用退出標誌,使執行緒正常退出,也就是當run方法完成後執行緒終止。
  2. 使用stop方法強行終止執行緒(這個方法不推薦使用,因為stop和suspend、resume一樣,也可能發生不可預料的結果)。
  3. 使用interrupt方法中斷執行緒。

1.1 使用退出標誌

當run方法執行完後,執行緒就會退出。但有時run方法是永遠不會結束的。如在服務端程式中使用執行緒進行監聽客戶端請求,或是其他的需要迴圈處理的任務。在這種情況下,一般是將這些任務放在一個迴圈中,如while迴圈。如果想讓迴圈永遠執行下去,可以使用while(true){……}來處理。但要想使while迴圈在某一特定條件下退出,最直接的方法就是設一個boolean型別的標誌,並通過設定這個標誌為true或false來控制while迴圈是否退出。

public class ThreadFlag implements Runnable{
    private volatile boolean exit=false;

    @Override
    public void run() {
        while (!exit){
            ///do something
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace
(); } } System.out.println("-----------ThreadFlag shutdown----------"); } public static void main(String[] args) throws InterruptedException { ThreadFlag threadFlag=new ThreadFlag(); Thread thread=new Thread(threadFlag); thread.start(); Thread.sleep(3000); threadFlag.exit=true; thread.join(); System.out.println("執行緒退出"); } }

上面程式碼使用了一個執行緒標誌位來判斷執行緒是否關閉.通過對執行緒標誌位進行操作來決定執行緒是否關閉.

1.2 使用stop方法終止執行緒

使用stop方法可以強行終止正在執行或掛起的執行緒。我們可以使用如下的程式碼來終止執行緒:

public class ThreadStop implements Runnable {
    @Override
    public void run() {
        try {
            while (true){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            System.out.println("-----------ThreadStop shutdown----------");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadStop threadStop=new ThreadStop();
        Thread thread=new Thread(threadStop);
        thread.start();
        Thread.sleep(3000);
        thread.stop();
        System.out.println("執行緒退出");
    }
}

這種方法執行緒不安全,Java不建議使用這種stop方法關閉執行緒。

1.3 使用interrupt方法終止執行緒

使用interrupt方法來終端執行緒可分為兩種情況:

(1)執行緒處於阻塞狀態,如使用了sleep方法。

(2)使用while(!isInterrupted()){……}來判斷執行緒是否被中斷。

enum  FuntuionType{
    FunctionType1,
    FunctionType2,
}
public class ThreadInterrupt implements Runnable{
    private FuntuionType funtuionType;

    public ThreadInterrupt(FuntuionType funtuionType) {
        this.funtuionType = funtuionType;
    }

    @Override
    public void run() {
        switch (funtuionType){
            case FunctionType1:
                int i = 0;
                while (!Thread.interrupted()){
                    //do something
                    i++;
                }
                System.out.println("Thread.interrupted() shutdown");
                break;
            case FunctionType2:
                try {
                    Thread.sleep(50*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Sleep InterruptedException throws");
                break;
            default:
                break;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadInterrupt threadInterrupt=new ThreadInterrupt(FuntuionType.FunctionType2);
        Thread thread=new Thread(threadInterrupt);
        thread.start();
        Thread.sleep(2000);
        thread.interrupt();
        System.out.println("執行緒已經退出");
    }
}

2. 任務取消

Java中沒有一種安全的搶佔式方法來停止執行緒,因此就沒有安全的搶佔式方法來停止任務。下面我們就來介紹一中協作式的方式來取消一個任務。

2.1 取消標誌位

第一種方式就是設定某個“已請求取消”的標誌位,而任務週期性的檢視這個標誌位。如果設定了這個標誌位,那麼任務就提前結束。

public class PrimeGenerator implements Runnable{
    private final List<BigInteger> primes=new ArrayList<>();
    private volatile boolean cancelled = false;
    private volatile BigInteger p = BigInteger.ONE;
    @Override
    public void run() {

        while (!cancelled){
            //此方法返回一個整數大於該BigInteger的素數。
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }

    public void cancel(){
        this.cancelled=true;
    }

    public synchronized List<BigInteger> get(){
        return new ArrayList<>(primes);
    }

    public static void main(String[] args) throws InterruptedException {
        PrimeGenerator primeGenerator=new PrimeGenerator();
        for (int i = 0; i < 10; i++) {
            Thread thread=new Thread(primeGenerator);
            thread.start();
        }

        Thread.sleep(2000);
        primeGenerator.cancel();
        for (BigInteger bigInteger : primeGenerator.get()) {
            System.out.println(bigInteger);
        }
    }
}

2.2 中斷 Interrupt

由於PrimeGenerator中的取消機制最終會使得素數的任務進行退出。但是如果使用這個方法中的任務呼叫了一個阻塞方法,列如BlockingQueue.put,那麼就會產生一個嚴重的問題————任務可能永遠不會檢查取消標誌。因此永遠不會結束。所以這個時候我們就採用中斷Interrupt來取消任務。

public class PrimeProducer implements Runnable {
    private final BlockingQueue<BigInteger> bigIntegers;
    private Thread thread;

    public void setThread(Thread thread) {
        this.thread = thread;
    }

    public PrimeProducer(BlockingQueue<BigInteger> bigIntegers) {
        this.bigIntegers = bigIntegers;
    }

    private volatile BigInteger p = BigInteger.ONE;

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()){
                p = p.nextProbablePrime();
                bigIntegers.put(p.nextProbablePrime());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void cancel(){
        thread.interrupt();
    }

    public BlockingQueue<BigInteger> getBigIntegers() {
        return bigIntegers;
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<BigInteger> bigIntegers=new LinkedBlockingQueue<>();
        PrimeProducer primeProducer=new PrimeProducer(bigIntegers);

        Thread thread=new Thread(primeProducer);
        primeProducer.setThread(thread);
        thread.start();
        
        Thread.sleep(1000);
        primeProducer.cancel();
        for (BigInteger bigInteger : primeProducer.getBigIntegers()) {
            System.out.println(bigInteger);
        }
    }
}

每個執行緒都有一個Boolean型別的中斷狀態。當中斷執行緒發生的時候,這個執行緒就把這個中斷狀態設定為true。咋Thread中包含了中斷執行緒以及查詢執行緒中斷狀態的方法。

Thrad中的中斷方法:

public class Thread{
    public void interrupt(){}
    public boolean isInterrupted(){}
    public static boolean interrupted(){}
}
  • interrupt()方法能夠中斷目標執行緒
  • isInterrupted方法能夠返回目標執行緒的中斷狀態
  • interrupted靜態方法能夠將清除當前執行緒的中斷狀態,並返回它之前的值,這也是清除中斷狀態的唯一方法

阻塞庫方法,比如Thread.sleep和Object.wait和Thread.join等,都會檢查執行緒何時中斷,並且在發生中斷時提前返回。他們在響應中斷時只需的操作包括:清除中斷狀態,丟擲InterruptExecption異常,表示阻塞操作由於中斷而提前結束。

當執行緒在非阻塞狀態下中斷時,它的中斷狀態將被設定為true,然後根據將取消的操作來檢查中斷狀態以判斷髮生了中斷。通過這樣的方法,中斷操作將變得有粘性————如果不觸發InterruptException,那麼中斷將一直保持,直到明確的清除中斷狀態。

總結:
對中斷的正確理解是:它並不會真正地中斷在一個正在執行的執行緒,而是發出中斷請求,然後由執行緒在下一個合適的時刻中斷自己。(這些合適的時刻稱為取消點)。有些方法,比如wait、slee和join等,將嚴格處理這種請求,當他們收到中斷請求或者開始執行發現某個已經被設定好的中斷狀態的時候,將丟擲一個異常。
中斷策略

最合理的中斷策略使某
種執行緒級(Thread-Level)取消操作或者服務級的取消操作:儘快退出,在必要時候進行清理,通知某個所有執行緒已經退出。當前檢查到中斷請求時候,任務不需要放棄所有的操作————它可以推遲處理中斷請求,並找到合適的時刻。因此需要記住中斷請求,並在完成當前任務之後丟擲InterruptExeception或者表示已經收到中斷請求。這項技術能夠確保在更新過程中發生中斷時,資料結構不會發生破壞。除了將InterruptException傳遞給呼叫者外還需要執行額外的操作,那麼應該在捕獲InterruptException之後恢復中斷狀態:

Thread.currentThread().interrupt();

響應中斷

當中斷異常發生的時候,我們有兩種方式進行響應中斷請求:

  • 傳遞異常(可能在執行某個特定於任務的清除操作之後):從而使你的方法也可以是中斷的阻塞方法
  • 恢復中斷狀態:從而使呼叫佔中的上層程式碼能對其進行處理。如果不想處理中斷請求,一種標準的方法就是通過再次呼叫interrupt來恢復中斷狀態。你不能遮蔽中斷狀態,你只能恢復中斷狀態。

2.3 通過Future來實現取消

我們已經使用了一種抽象機制來管理任務的生命週期,處理異常,下面我們來介紹一種使用Future類來實現任務取消。

public class TimeRun {
    private static ExecutorService executorService= Executors.newFixedThreadPool(5);
    
    public static void timeRun(Runnable runnable, long timeout, TimeUnit timeUnit){
        Future<?> submit = executorService.submit(runnable);
        try {
            submit.get(timeout,timeUnit);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            //接下來任務將被取消
            e.printStackTrace();
        } finally {
            //如果任務已經結束,那麼執行取消操作也不會帶來任何影響
            //如果任務正在執行,那麼將會被中斷
            submit.cancel(true);
        }
    }
}

當Future.get丟擲InterruptException或者TimeoutException時,如果你知道不再需要結果,那麼就可以呼叫Future.cancel來取消任務。

2.4 處理不可中斷的阻塞

如果一個執行緒由於執行同步的Socket I/O 或者等待獲得內建鎖而阻塞,那麼中斷請求只能設定執行緒的中斷狀態,除此之外沒有其他任何作用。對於那些由於執行不可中斷操作而被阻塞的執行緒,可以使用類似於中斷的手段來停止這些執行緒。

public class ReaderThread extends Thread {
    private final Socket socket;
    private final InputStream inputStream;
    public static final int BUFSIZE=1024;
    
    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.inputStream=socket.getInputStream();
    }

    @Override
    public void interrupt() {
        try {
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            super.interrupt();
        }
    }

    @Override
    public void run() {
        byte [] buf=new byte[BUFSIZE];
        try {
            while (true) {
                int count= inputStream.read(buf);
                //dosomething
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

通過重寫Interrupt方法將非標準的取消操作封裝到Thread中,實現中斷功能

3. 停止執行緒的服務

正確的封裝原則是:除非擁有某個執行緒,否則不能對執行緒進行操作。比如,中斷執行緒,或者修改執行緒的優先順序等等。

服務應該提供生命週期方法(Lifecycle Method)來關閉它自己以及它所擁有的執行緒。這樣應用程式關閉服務的時候,服務就可以關閉所有執行緒了。對於持有執行緒的服務,只要服務的存在時間大於建立執行緒的方法的存在時間,就應該提供生命週期方法。

3.1 關閉ExecutorService

關閉ExecutorService提供了兩種關閉方法:shutdown和shutdownNow方法

強行關閉的速度更快,但是風險也更大,因為任務很可能執行到一半就結束
正常關閉的速度雖然慢,但是卻更為安全,因為ExecutorService會一直等到佇列中的所有任務都執行完成之後才關閉。

4. JVM關閉

JVM關閉應用程式可以分為兩種方式:

正常關閉:當最後一個“正常(非守護)”執行緒結束的時候,或者呼叫了System.exit(0)時,或者通過其他特定於平臺的方法關閉(比如發出了SIGNT訊號或者Ctrl-C)
強行關閉:通過呼叫Runtime.halt或者在作業系統中“殺死”JVM程序來強行關閉JVM

4.1 關閉鉤子

在正常關閉中,JVM首先呼叫以及註冊的關閉鉤子(shutdown Hook)。關閉鉤子是指通過Runntime.addShutdwonHook註冊的但是尚未開始的執行緒。JVM不能保證這些執行緒的執行順序。在關閉應用程式執行緒時,如果有執行緒正在執行,那麼這些執行緒接下來將於關閉程序併發執行。

public class JavaHook {
    private static class JavaTask implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("-----------JavaTask shutdown----------");
        }
    }

    public static void main(String[] args) {
        JavaTask javaTask=new JavaTask();
        Thread thread=new Thread(javaTask);
        thread.start();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("-----------JavaHook finish----------");
            }
        }));
        System.out.println("JVM finsih....");
    }
}

關閉鉤子應該是執行緒安全的:他們在訪問共享資料的時候必須使用同步機制,並且小心的避免發生死鎖,這與其他併發程式碼的要求相同。而且關閉鉤子不應該對應用程式的狀態(比如其他服務是否已經關閉,後者所有的正常線是否已經執行完成)後者JVm的關閉原因作出任何假設。

5.總結

Java並沒有提供某種搶佔式的機制來取消或者終結執行緒。想法它提供一種協作式的中斷機制來實現取消操作,但是這要依賴於如何構建取消操作的協議,以及能否遵循這些協議。