1. 程式人生 > >Java多線程編程模式實戰指南(三):Two-phase Termination模式

Java多線程編程模式實戰指南(三):Two-phase Termination模式

增加 row throws mgr 額外 finally join table 還需

停止線程是一個目標簡單而實現卻不那麽簡單的任務。首先,Java沒有提供直接的API用於停止線程。此外,停止線程時還有一些額外的細節需要考慮,如待停止的線程處於阻塞(等待鎖)或者等待狀態(等待其它線程)、尚有未處理完的任務等。本文介紹的Two-phase Termination模式提供了一種通用的用於優雅地停止線程的方法。

Two-phase Termination模式簡介

Java並沒有提供直接的API用於停止線程。Two-phase Termination模式通過將停止線程這個動作分解為準備階段和執行階段這兩個階段,以應對停止線程過程中可能存在的問題。

準備階段。該階段主要動作是“通知”目標線程(欲停止的線程)準備進行停止。這一步會設置一個標誌變量用於指示目標線程可以準備停止了。但是,由於目標線程可能正處於阻塞狀態(等待鎖的獲得)、等待狀態(如調用Object.wait)或者I/O(如InputStream.read)等待等狀態,即便設置了這個標誌,目標線程也無法立即“看到”這個標誌而做出相應動作。因此,這一階段還需要通過調用目標線程的interrupt方法,以期望目標線程能夠通過捕獲相關的異常偵測到該方法調用,從而中斷其阻塞狀態、等待狀態。對於能夠對interrupt方法調用作出響應的方法(參見表1),目標線程代碼可以通過捕獲這些方法拋出的InterruptedException來偵測線程停止信號。但也有一些方法(如InputStream.read)並不對interrupt調用作出響應,此時需要我們手工處理,如同步的Socket I/O操作中通過關閉socket,使處於I/O等待的socket拋出java.net.SocketException。

表 1. 能夠對Thread.interrupt作出響應的一些方法

              方法                            響應interrupt調用拋出的異常

Object.wait() 、 Object.wait(long timeout) 、Object.wait(long timeout, int nanos)     InterruptedException

Thread.sleep(long millis) 、Thread.sleep(long millis, int nanos)              InterruptedException

Thread.join()、Thread.join(long millis) 、Thread.join(long millis, int nanos)        InterruptedException

java.util.concurrent.BlockingQueue.take()                        InterruptedException

java.util.concurrent.locks.Lock.lockInterruptibly()                     InterruptedException

java.nio.channels.InterruptibleChannel                          java.nio.channels.ClosedByInterruptException

執行階段。該階段的主要動作是檢查準備階段所設置的線程停止標誌和信號,在此基礎上決定線程停止的時機,並進行適當的“清理”操作。

Two-phase Termination模式的架構

Two-phase Termination模式的主要參與者有以下幾種。其類圖如圖1所示。

圖 1. Two-phase Termination模式的類圖

技術分享

  • ThreadOwner:目標線程的擁有者。Java語言中,並沒有線程的擁有者的概念,但是線程的背後是其要處理的任務或者其所提供的服務,因此我們不能在不清楚某個線程具體是做什麽的情況下貿然將其停止。一般地,我們可以將目標線程的創建者視為該線程的擁有者,並假定其“知道”目標線程的工作內容,可以安全地停止目標線程。
  • TerminatableThread:可停止的線程。其主要方法及職責如下:
    • terminate:設置線程停止標誌,並發送停止“信號”給目標線程。
    • doTerminate:留給子類實現線程停止時所需的一些額外操作,如目標線程代碼中包含Socket I/O,子類可以在該方法中關閉Socket以達到快速停止線程,而不會使目標線程等待I/O完成才能偵測到線程停止標記。
    • doRun:留給子類實現線程的處理邏輯。相當於Thread.run,只不過該方法中無需關心停止線程的邏輯,因為這個邏輯已經被封裝在TerminatableThread的run方法中了。
    • doCleanup:留給子類實現線程停止後可能需要的一些清理動作。
  • TerminationToken:線程停止標誌。toShutdown用於指示目標線程可以停止了。reservations可用於反映目標線程還有多少數量未完成的任務,以支持等目標線程處理完其任務後再行停止。

準備階段的序列圖如圖2所示:

圖 2. 準備階段的序列圖

技術分享

1、客戶端代碼調用線程擁有者的shutdown方法。

2、shutdown方法調用目標線程的terminate方法。

3~4、terminate方法將terminationToken的toShutdown標誌設置為true。

5、terminate方法調用由TerminatableThread子類實現的doTerminate方法,使得子類可以為停止目標線程做一些其它必要的操作。

6、若terminationToken的reservations屬性值為0,則表示目標線程沒有未處理完的任務或者ThreadOwner在停止線程時不關心其是否有未處理的任務。此時,terminate方法會調用目標線程的interrupt方法。

7、terminate方法調用結束。

8、shutdown調用返回,此時目標線程可能還仍然在運行。

執行階段由目標線程的代碼去檢查terminationToken的toShutdown屬性、reservations屬性的值,並捕獲由interrupt方法調用拋出的相關異常以決定是否停止線程。在線程停止前由TerminatableThread子類實現的doCleanup方法會被調用。

Two-phase Termination模式實戰案例

某系統需要對接告警系統以實現告警功能。告警系統是一個C/S結構的系統,它提供了一套客戶端API(AlarmAgent)用於與其對接的系統給其發送告警。該系統將告警功能封裝在一個名為AlarmMgr的單件類(Singleton)中,系統中其它代碼需要發送告警的只需要調用該類的sendAlarm方法。該方法將告警信息緩存入隊列,由專門的告警發送線程負責調用AlarmAgent的相關方法將告警信息發送至告警服務器。

告警發送線程是一個用戶線程(User Thread),因此在系統的停止過程中,該線程若未停止則會阻止JVM正常關閉。所以,在系統停止過程中我們必須主動去停止告警發送線程,而非依賴JVM。為了能夠盡可能快的以優雅的方式將告警發送線程停止,我們需要處理以下兩個問題:

  1. 當告警緩存隊列非空時,需要將隊列中已有的告警信息發送至告警服務器。
  2. 由於緩存告警信息的隊列是一個阻塞隊列(LinkedBlockingQueue),在該隊列為空的情況下,告警發送線程會一直處於等待狀態。這會導致其無法響應我們的關閉線程的請求。

上述問題可以通過使用Two-phase Termination模式來解決。

AlarmMgr相當於圖1中的ThreadOwner參與者實例,它是告警發送線程的擁有者。系統停止過程中調用其shutdown方法(AlarmMgr.getInstance().shutdown())即可請求告警發送線程停止。其代碼如清單1所示:

清單 1. AlarmMgr源碼

技術分享
public class AlarmMgr {
    private final BlockingQueue<AlarmInfo> alarms = new LinkedBlockingQueue<AlarmInfo>();
    //告警系統客戶端API
    private final AlarmAgent alarmAgent = new AlarmAgent();
    //告警發送線程
    private final AbstractTerminatableThread alarmSendingThread;

    private boolean shutdownRequested = false;

    private static final AlarmMgr INSTANCE = new AlarmMgr();

    private AlarmMgr() {
        alarmSendingThread = new AbstractTerminatableThread() {
            @Override
            protected void doRun() throws Exception {
                if (alarmAgent.waitUntilConnected()) {
                    AlarmInfo alarm;
                    alarm = alarms.take();
                    terminationToken.reservations.decrementAndGet();
                    try {
                        alarmAgent.sendAlarm(alarm);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }

            @Override
            protected void doCleanup(Exception exp) {
                if (null != exp) {
                    exp.printStackTrace();
                }
                alarmAgent.disconnect();
            }

        };

        alarmAgent.init();
    }

    public static AlarmMgr getInstance() {
        return INSTANCE;
    }

    public void sendAlarm(AlarmType type, String id, String extraInfo) {
        final TerminationToken terminationToken = alarmSendingThread.terminationToken;
        if (terminationToken.isToShutdown()) {
            // log the alarm
            System.err.println("rejected alarm:" + id + "," + extraInfo);
            return;

        }
        try {
            AlarmInfo alarm = new AlarmInfo(id, type);
            alarm.setExtraInfo(extraInfo);
            terminationToken.reservations.incrementAndGet();
            alarms.add(alarm);
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    public void init() {
        alarmSendingThread.start();
    }

    public synchronized void shutdown() {
        if (shutdownRequested) {
            throw new IllegalStateException("shutdown already requested!");
        }
        
        alarmSendingThread.terminate();
        shutdownRequested = true;
    }

    public int pendingAlarms() {
        return alarmSendingThread.terminationToken.reservations.get();
    }

}

class AlarmAgent {
    // 省略其它代碼
    private volatile boolean connectedToServer = false;

    public void sendAlarm(AlarmInfo alarm) throws Exception {
        // 省略其它代碼
        System.out.println("Sending " + alarm);
        try {
            Thread.sleep(50);
        } catch (Exception e) {

        }
    }

    public void init() {
        // 省略其它代碼
        connectedToServer = true;
    }

    public void disconnect() {
        // 省略其它代碼
        System.out.println("disconnected from alarm server.");
    }

    public boolean waitUntilConnected() {
        // 省略其它代碼
        return connectedToServer;
    }
}
View Code

從上面的代碼可以看出,AlarmMgr每接受一個告警信息放入緩存隊列便將terminationToken的reservations值增加1,而告警發送線程每發送一個告警到告警服務器則將terminationToken的reservations值減少1。這為我們可以在停止告警發送線程前確保隊列中現有的告警信息會被處理完畢提供了線索:AbstractTerminatableThread的run方法會根據terminationToken的reservations是否為0來判斷待停止的線程已無未處理的任務,或者無需關心其是否有待處理的任務。

AbstractTerminatableThread的源碼見清單2:

清單 2. AbstractTerminatableThread源碼

技術分享
public abstract class AbstractTerminatableThread extends Thread 
     implements Terminatable {
    public final TerminationToken terminationToken;

    public AbstractTerminatableThread() {
        super();
        this.terminationToken = new TerminationToken();
    }

      /**
     * 
     * @param terminationToken 線程間共享的線程終止標誌實例
    */
    public AbstractTerminatableThread(TerminationToken terminationToken) {
        super();
        this.terminationToken = terminationToken;
    }

    protected abstract void doRun() throws Exception;

    protected void doCleanup(Exception cause) {}

    protected void doTerminiate() {}

    @Override
    public void run() {
        Exception ex = null;
        try {
            while (true) {
                                    /*
                 * 在執行線程的處理邏輯前先判斷線程停止的標誌。
                 */
                if (terminationToken.isToShutdown()
                    && terminationToken.reservations.get() <= 0) {
                    break;
                }
                doRun();
            }

        } catch (Exception e) {
            // Allow the thread to terminate in response of a interrupt invocation
            ex = e;
        } finally {
            doCleanup(ex);
        }
    }

    @Override
    public void interrupt() {
        terminate();
    }

    @Override
    public void terminate() {
        terminationToken.setToShutdown(true);
        try {
            doTerminiate();
        } finally {
            // 若無待處理的任務,則試圖強制終止線程
            if (terminationToken.reservations.get() <= 0) {
                super.interrupt();
            }
        }
    }

}
View Code

AbstractTerminatableThread是一個可復用的TerminatableThread參與者實例。其terminate方法完成了線程停止的準備階段。該方法首先將terminationToken的toShutdown變量設置為true,指示目標線程可以準備停止了。但是,此時目標線程可能處於一些阻塞(Blocking)方法的調用,如調用Object.sleep、InputStream.read等,無法偵測到該變量。調用目標線程的interrupt方法可以使一些阻塞方法(參見表1)通過拋出異常從而使目標線程停止。但也有些阻塞方法如InputStream.read並不對interrupt方法調用作出響應,此時需要由TerminatableThread的子類實現doTerminiate方法,在該方法中實現一些關閉目標線程所需的額外操作。例如,在Socket同步I/O中通過關閉socket使得使用該socket的線程若處於I/O等待會拋出SocketException。因此,terminate方法下一步調用doTerminate方法。接著,若terminationToken.reservations的值為非正數(表示目標線程無待處理任務、或者我們不關心其是否有待處理任務),則terminate方法會調用目標線程的interrupt方法,強制目標線程的阻塞方法中斷,從而強制終止目標線程。

執行階段在AbstractTerminatableThread的run方法中完成。該方法通過對TerminationToken的toShutdown屬性和reservations屬性的判斷或者通過捕獲由interrupt方法調用而拋出的異常來終止線程。並在線程終止前調用由TerminatableThread子類實現的doCleanup方法用於執行一些清理動作。

在執行階段,由於AbstractTerminatableThread.run方法每次執行線程處理邏輯(通過調用doRun方法實現)前都先判斷下toShutdown屬性和reservations屬性的值,在目標線程處理完其待處理的任務後(此時reservations屬性的值為非正數)目標線程run方法也就退出了while循環。因此,線程的處理邏輯代碼(doRun方法)將不再被調用,從而使本案例在不使用Two-phase Termination模式的情況下停止目標線程存在的兩個問題得以解決(目標線程停止前可以保證其處理完待處理的任務——發送隊列中現有的告警信息到服務器)和規避(目標線程發送完隊列中現有的告警信息後,doRun方法不再被調用,從而避免了隊列為空時BlockingQueue.take調用導致的阻塞)。

從上可知,準備階段、執行階段需要通過TerminationToken作為“中介”來協調二者的動作。TerminationToken的源碼如清單3所示:

清單 3. TerminationToken源碼

技術分享
public class TerminationToken {
        //使用volatile修飾,以保證無需顯示鎖的情況下該變量的內存可見性
    protected volatile boolean toShutdown = false;
    public final AtomicInteger reservations = new AtomicInteger(0);

    public boolean isToShutdown() {
        return toShutdown;
    }

    protected void setToShutdown(boolean toShutdown) {
        this.toShutdown = true;
    }

}
View Code

Two-phase Termination模式的評價與實現考量

Two-phase Termination模式使得我們可以對各種形式的目標線程進行優雅的停止。如目標線程調用了能夠對interrupt方法調用作出響應的阻塞方法、目標線程調用了不能對interrupt方法調用作出響應的阻塞方法、目標線程作為消費者處理其它線程生產的“產品”在其停止前需要處理完現有“產品”等。Two-phase Termination模式實現的線程停止可能出現延遲,即客戶端代碼調用完ThreadOwner.shutdown後,該線程可能仍在運行。

本文案例展示了一個可復用的Two-phase Termination模式實現代碼。讀者若要自行實現該模式,可能需要註意以下幾個問題。

線程停止標誌

本文案例使用了TerminationToken作為目標線程可以準備停止的標誌。從清單3的代碼我們可以看到,TerminationToken使用了toShutdown這個boolean變量作為主要的停止標誌,而非使用Thread.isInterrupted()。這是因為,調用目標線程的interrupt方法無法保證目標線程的isInterrupted()方法返回值為true:目標線程可能調用一些能夠捕獲InterruptedException而不保留線程中斷狀態的代碼。另外,toShutdown這個變量為了保證內存可見性而又能避免使用顯式鎖的開銷,采用了volatile修飾。這點也很重要,筆者曾經見過一些采用boolean變量作為線程停止標誌的代碼,只是這些變量沒有用volatile修飾,對其訪問也沒有加鎖,這就可能無法停止目標線程。

生產者——消費者問題中的線程停止

在多線程編程中,許多問題和一些多線程編程模式都可以看作生產者——消費者問題。停止處於生產者——消費者問題中的線程,需要考慮更多的問題:需要註意線程的停止順序,如果消費者線程比生產者線程先停止則會導致生產者生產的新”產品“無法被處理,而如果先停止生產者線程又可能使消費者線程處於空等待(如生產者消費者采用阻塞隊列中轉”產品“)。並且,停止消費者線程前是否考慮要等待其處理完所有待處理的任務或者將這些任務做個備份也是個問題。本文案例部分地展示生產者——消費者問題中線程停止的處理,其核心就是通過使用TerminationToken的reservations變量:生產者每”生產“一個產品,Two-phase Termination模式的調用方代碼要使reservations變量值增加1(terminationToken.reservations.incrementAndGet());消費者線程每處理一個產品,Two-phase Termination模式的調用方代碼要使reservations變量值減少1(terminationToken.reservations.decrementAndGet())。當然,在停止消費者線程時如果我們不關心其待處理的任務,Two-phase Termination模式的調用方代碼可以忽略對reservations變量的操作。清單4展示了一個完整的停止生產者——消費者問題中的線程的例子:

清單 4. 停止生產者——消費者問題中的線程的例子

技術分享
public class ProducerConsumerStop {
    class SampleConsumer<P> {
        private final BlockingQueue<P> queue = new LinkedBlockingQueue<P>();

        private AbstractTerminatableThread workThread 
                = new AbstractTerminatableThread() {
            @Override
            protected void doRun() throws Exception {
                terminationToken.reservations.decrementAndGet();
                P product = queue.take();
                // ...
                System.out.println(product);
            }

        };

        public void placeProduct(P product) {
            if (workThread.terminationToken.isToShutdown()) {
                throw new IllegalStateException("Thread shutdown");
            }
            try {
                queue.put(product);
                workThread.terminationToken.reservations.incrementAndGet();
            } catch (InterruptedException e) {

            }
        }

        public void shutdown() {
            workThread.terminate();
        }

        public void start() {
            workThread.start();
        }
    }

    public void test() {
        final SampleConsumer<String> aConsumer = new SampleConsumer<String>();

        AbstractTerminatableThread aProducer = new AbstractTerminatableThread() {
            private int i = 0;

            @Override
            protected void doRun() throws Exception {
                aConsumer.placeProduct(String.valueOf(i));
            }

            @Override
            protected void doCleanup(Exception cause) {
                // 生產者線程停止完畢後再請求停止消費者線程
                aConsumer.shutdown();
            }

        };

        aProducer.start();
        aConsumer.start();
    }
}
View Code

隱藏而非暴露可停止的線程

為了保證可停止的線程不被其它代碼誤停止,一般我們將可停止線程隱藏在線程擁有者背後,而使系統中其它代碼無法直接訪問該線程,正如本案例代碼(見清單1)所展示:AlarmMgr定義了一個private字段alarmSendingThread用於引用告警發送線程(可停止的線程),系統中的其它代碼只能通過調用AlarmMgr的shutdown方法來請求該線程停止,而非通過引用該線程對象自身來停止它。

總結

本文介紹了Two-phase Termination模式的意圖及架構。並結合筆者工作經歷提供了一個實際的案例用於展示一個可復用的Two-phase Termination模式實現代碼,在此基礎上對該模式進行了評價並分享在實際運用該模式時需要註意的事項。

Java多線程編程模式實戰指南(三):Two-phase Termination模式