1. 程式人生 > >《JAVA多執行緒程式設計實戰指南》之Two-phase Termination(兩階段終止)模式

《JAVA多執行緒程式設計實戰指南》之Two-phase Termination(兩階段終止)模式

本文是《JAVA多執行緒程式設計實戰指南》的樣章,感謝作者授權併發網(ifeve.com)發表此文。感謝demochen整理此文。

5.1Two-phase Termination模式簡介

停止執行緒是一個目標簡單而實現卻不那麼簡單的任務。首先,Java沒有提供直接的API用於停止執行緒。此外,停止執行緒還有一些額外的細節需要考慮,如停止的執行緒處於阻塞(如等待鎖)或者等待狀態(等待其他執行緒),尚有未處理完的任務等。
Two-phase Termination模式通過將停止執行緒這個動作分解為準備階段和執行階段這兩個階段,提供了一種通用的用於優雅地停止執行緒的方法。
準備階段。該階段的主要動作是“通知”目標執行緒(欲停止的執行緒)準備進行停止。這一步會設定一個標誌變數用於指示目標執行緒可與準備停止了。但是,由於目標執行緒可能正處於阻塞狀態(等待鎖的獲得)、等待狀態(如呼叫Object.wait)或者I/O(如InputStream.read)等待等狀態,即便設定了這個標誌,目標執行緒也無法立即”看到”這個標誌而做出相應的動作。因此,這一階段還需要用過interrupt方法,以期望目標執行緒能夠對能夠通過捕獲相關的異常偵測到該方法呼叫,從而中斷其阻塞狀態、等待狀態。對於能夠對interrupt方法呼叫做出響應的方法(參見表5-1),目標執行緒程式碼可以通過捕獲這些方法丟擲的InterruptedException來偵測執行緒停止訊號。但也有一些方法(如InputStream.read)並不對interrupt呼叫作出響應,此時需要我們手工處理,如同步的Socket I/O操作中通過關閉socket,使處於I/O等待的socket丟擲java.net.SocketException。


表5-1.能夠對Thread.interrupt作出響應的一些方法

方法(或者類) 響應interrupt呼叫丟擲的異常
Object.wait() ǃObject.wait(long timeout) ǃObject.wait
(long timeout, int nanos)
InterruptedException
Thread.sleep(long millis) ǃThread.sleep(long millis, int
nanos)
InterruptedException
Thread.join()ǃThread.join(long millis) ǃThread.Join
(long millis, int nanos)
InterruptedException
java.util.concurrent.BlockingQueue.take() InterruptedException
java.util.concurrent.locks.Lock.lockInterruptibly() InterruptedException
java.nio.channels.InterruptibleChannel java.nio.channels.ClosedByInterruptException

執行階段。該階段的主要動作是檢查準備階段所設定的執行緒停止標誌和訊號,在此基礎上決定執行緒停止的時機,並進行適當的”清理”操作。

5.2Two-phase Termination 模式的架構

Two-phase Termination模式的主要參與者有以下幾種。其類圖如圖5-1所示

Fig5-1

ThreadOwner:目標執行緒的擁有者.Java語言中,並沒有執行緒擁有者的概念,但是執行緒的背後是其要處理的任務或者其所提供的服務,因此我們不能在不清楚某個執行緒具體是做什麼的情況下貿然將其停止。一般地,我們可以將目標執行緒的建立者視為該執行緒的擁有者,並假定其”知道”目標執行緒的工作內容,可以安全地停止目標執行緒。
Terminatable:可停止執行緒的抽象。其主要方法及職責如下
terminate:請求目標執行緒停止。
AbstractTerminatableThread:可停止的執行緒。其主要方法及職責如下
terminate:設定執行緒停止標誌,併發送停止”訊號”給目標執行緒。
doTerminate:留給自雷實現執行緒停止時所需的一些額外操作,如目標執行緒程式碼中包含SockerI/O,子類可以在該方法中關閉Socket以達到快速停止執行緒,而不會使目標執行緒等待I/O完成才能偵測到執行緒停止標記。
doRun:執行緒處理邏輯方法。留給子類實現執行緒的處理邏輯。相當於Thread.run(),只不過該方法中無需關心停止執行緒的邏輯,因為這個邏輯已經被封裝在TerminatableThread的run方法中了。
doCleanup:留給子類實現執行緒停止後可能需要的一些清理動作。
TerminationToken:執行緒停止標誌。toShutdown用於目標執行緒可以停止了。reservations可用於反映目標執行緒還有多少數量未完成的任務,以支援等目標執行緒處理完其他任務後再行停止。
ConcreteTerminatableThread:由應用自己實現的AbstractTerminatableThread參與者的實現類。該類需要實現其父類的doRun抽象方法,在其中實現執行緒的處理邏輯,並根據應用的實際需要覆蓋(Override)其父類的doTerminate方法、doCleanup方法。

準備階段的序列圖如圖5-2所示。

Fig5-2
第1步:客戶端程式碼呼叫執行緒擁有者的shutdown方法。
第2步:shutdown方法呼叫目標執行緒的terminate方法。
第3,4步:terminate方法將terminationToken的toShutdown標誌設定為true。
第5步:terminate方法呼叫由AbstractTerminatableThread子類實現的doTerminate的方法,使得子類可以為停止目標執行緒做一些其他必要的操作。
第6步:若terminationToken的reservations屬性值為0,則表示目標執行緒沒有未處理完的任務或者ThreadOwner在停止執行緒時不關心其是否有未處理的任務。此時,terminate方法會呼叫目標執行緒的interrupt方法。
第7步:terminate方法呼叫結束
第8步:shutdown呼叫返回,此時目標執行緒可能還仍然在執行。
執行階段由目標執行緒的run方法去檢查terminationToken的toShutdown屬性、reservations屬性的值,並捕獲由interrupt方法呼叫丟擲的相關異常以決定是否停止執行緒。線上程停止前由AbstractTerminatableThread子類實現的doCleanup方法會被呼叫。

5.3Two-phase Termination模式實戰案例解析

某系統的告警功能被封裝在一個模組中。告警模組的入口類是AlarmMgr。其他模組(業務模組)需要傳送告警資訊時只需要呼叫AlarmMgr的sendAlarm方法即可。該方法將告警資訊快取如佇列,由專門的告警傳送執行緒負責呼叫AlarmAgent的相關方法傳送告警。AlarmAgent類負責與告警伺服器對接,它通過網路連線將告警資訊傳送至告警伺服器。
告警傳送執行緒是一個使用者執行緒(user Thread),因此在系統的停止過程中,該執行緒若未停止則會組織JVM正常關閉。所以,在系統停止過程中我們必須主動去停止告警傳送執行緒,而非依賴JVM。為了能夠儘快地以優雅的方式將告警執行緒停止,我們需要處理以下兩個問題。
1.當告警快取佇列非空時,需要將佇列中已有的告警資訊傳送至告警伺服器。
2.由於快取告警資訊的佇列是一個阻塞佇列(ArrayBlockingQueue),在該佇列為空的情況下,告警傳送執行緒會一直處於等待狀態。這會導致其無法響應我們關閉執行緒的請求。
上述問題可以通過使用Two-phase Termination模式來解決。
AlarmMgr相當於圖5-1中的ThreadOwner參與者例項,它是告警傳送執行緒(對應例項變數alarmSendingThread)的擁有者。系統停止過程中呼叫其shutdown方法(AlarmMgr.getInstance().shutdown())即可請求告警傳送執行緒停止。其程式碼如清單5-1所示
清單5-1.AlarmMgr類原始碼

/**
* 告警功能入口類 模式角色: Two-phaseTermination.ThreadOwner
*/
public class AlarmMgr {

    // 儲存AlarMgr類的唯一例項
    private static final AlarmMgr    INSTANCE          = new AlarmMgr();

    private volatile boolean         shutdownRequested = false;
    // 告警傳送執行緒
    private final AlarmSendingThread alarmSendingThread;

    // 私有構造器
    private AlarmMgr() {
        alarmSendingThread = new AlarmSendingThread();
    }

    public static AlarmMgr getInstance() {
        return INSTANCE;
    }

    /**
    * 傳送告警
    *
    * @Description: TODO(這裡用一句話描述這個方法的作用)
    * @param type告警型別
    * @param id告警編號
    * @param extraInfo告警引數
    * @return 由type+id+extraInfo唯一確定的告警資訊提交的次數。-1表示告警管理器已被關閉
    */
    public int sendAlarm(AlarmType type, String id, String extraInfo) {
        Debug.info("Trigger alarm " + type + "," + id + ',' + extraInfo);
        int duplicateSubmissionCount = 0;
        try {
            AlarmInfo alarmInfo = new AlarmInfo(id, type);
            alarmInfo.setExtraInfo(extraInfo);
            duplicateSubmissionCount = alarmSendingThread.sendAlarm(alarmInfo);
        } catch (Throwable t) {
            t.printStackTrace();
        }
        return duplicateSubmissionCount;
    }

    public void init() {
        alarmSendingThread.start();
    }

    public synchronized void shutdown() {
        if (shutdownRequested) {
            throw new IllegalStateException("shutdown already requested!");
        }
        alarmSendingThread.terminate();
        shutdownRequested = true;
    }

}

告警傳送執行緒類AlarmSendingThread 的原始碼,如清單5-2所示。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

//模式角色:Two-phaseTermination.ConcreteTerminatableThread
public class AlarmSendingThread extends AbstractTerminatableThread {

private final AlarmAgent alarmAgent = new AlarmAgent();
//告警佇列
private final BlockingQueue alarmQueues;
private final ConcurrentMap<String, AtomicInteger>; submittedAlarmRegistry;

public AlarmSending alarmQueue=new ArThread(){
//rayBlockingQueue(100);
submittedAlarmRegistry = new ConcurrentHashMap<String, AtomicInteger>();
alarmAgent.init();
}

@Override
protected void doRun() throws Exception {
AlarmInfo alarm; alarm = alarmQueue.take();
terminationToken.reservations.decrementAndGet();
try {
//將告警資訊傳送至告警伺服器
alarmAgent.sendAlarm(alarm);
} catch (Exception e) {
e.printStackTrace();
}
if (AlarmType.RESUME == alarm.type) {
String key = AlarmType.FAULT.toString() + ':' + alarm.getId() + '@' + alarm.getExtraInfo();
submittedAlarmRegistry.remove(key);
key = AlarmType.RESUME.toString() + ':' + alarm.getId() + '@' + alarm.getExtraInfo();
submittedAlarmRegistry.remove(key);
}
}

public int sendAlarm(final AlarmInfo alarmInfo){
AlarmType type = alarmInfo.type;
String id = alarmInfo.getId();
String extraInfo = alarmInfo.getExtraInfo();
if (terminationToken.isToShutdown()) {
System.err.println("rejected alarm:" + id + "," + extraInfo);
return -1;
}
int duplicateSubmissionCount = 0;
try {
AtomicInteger prevSubmittedCounter;
prevSubmittedCounter = submittedAlarmRegistry.putIfAbsent( type.toString()+ ':' + id + '@' + extraInfo, new AtomicInteger(0));
if (null == prevSubmittedCounter) {
terminationToken.reservations.incrementAndGet();
alarmQueue.put(alarmInfo);
}else{
//故障未恢復,不用重複傳送告警資訊個伺服器,故僅增加計數
duplicateSubmissionCount = prevSubmittedCounter.incrementAndGet();
}
} catch (Throwable t) {
t.printStackTrace();
}
return duplicateSubmissionCount;
}

@Override
protected void doCleanup(Exception exp) {
if (null != exp && !(exp instanceof InterruptedException)) {
exp.printStackTrace();
}
alarmAgent.disconnect();
}
}

從上面的程式碼可以看出,AlarmSendingThread每接受一個告警資訊放入快取佇列便將terminationToken 的reservations 值加1,而每傳送一個告警到告警伺服器則將terminationToken 的reservations值減少1.這為我們可以在停止告警傳送執行緒前確保佇列中現有的告警資訊會被處理完畢提供了線索:AbstractTerminatableThread的run方法會根據terminationToken 的reservations 是否為0來判斷待停止的執行緒已無未處理的任務,或者無需關係起是否有待處理的任務。

AbstractTerminatableThread 的原始碼見清單5-3

/**
* 可停止的抽象執行緒 模式角色:Two-phaseTermination.AbstractTerminatableThread
*
* @author Viscent Huang
* @date 2015年12月5日 下午8:38:17
*/

public abstract class AbstractTerminatableThread extends Thread implements Terminatable {

    // 模式角色:Two-phaseTermination.TerminationToken
    public final TerminationToken terminationToken;

    public AbstractTerminatableThread() {
        this(new TerminationToken());
    }

    /**
    * @param terminationToken 執行緒間共享的執行緒終止標誌例項
    */
    public AbstractTerminatableThread(TerminationToken terminationToken) {
        super();
        this.terminationToken = terminationToken;
        terminationToken.register(this);
    }

    /**
    * 留給子類實現其執行緒處理邏輯
    *
    * @throws Exception
    */
    protected abstract void doRun() throws Exception;

    /**
    * 留給子類實現。用於實現執行緒停止後的一些清理動作
    *
    * @param cause 設定檔案
    */
    protected void doCleanup(Exception cause) {
        // 什麼也不做
    }

    @Override
    public void run() {
        Exception ex = null;
        try {
            for (;;) {
                if (terminationToken.isToShutdown && terminationToken.reservations.get() <= 0) {
                    break;
                }
                doRun();
            }
        } catch (Exception e) {
            // 使得執行緒能夠響應interrupt呼叫而退出
            ex = e;
        } finally {
            try {
                doCleanup(ex);
            } finally {
                terminationToken.notifyThreadTermination(this);
            }
        }
    }

    @Override
    public void interrupt() {
        terminate();
    }

    /**
    * 請求停止執行緒
    *
    * @see io.github.viscent.mtpattern.tpt.Terminatable#terminate()
    */
    @Override
    public void terminate() {
        terminationToken.setToShutdown(true);
        try {
            doTerminiate();
        } finally {
            // 若無待處理的任務,則試圖強制終止執行緒
            if (terminationToken.reservations.get() <= 0) {
                super.interrupt();
            }
        }
    }

    public void terminate(boolean waitUtilThreadTerminated) {
        terminate();
        if (waitUtilThreadTerminated) {
            try {
                this.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

}

AbstractTerminatableThread是一個可複用的Terminatable 參與者例項。其terminate 方法完成了執行緒停止的準備階段。該方法首先將terminationToken 的toShutdown 屬性設定為true,指示目標執行緒可以準備停止了。但是,此時目標執行緒可能處於一些阻塞(Blocking)方法的呼叫,如 Object.sleep、ǃInputStream.read 等,無法檢測該變數的值。呼叫目標執行緒的interrupt 方法可以使一些阻塞方法(參見表5-1)丟擲異常從而是目標執行緒停止。但也有些阻塞方法如InputStream.read 並不對interrupt 方法呼叫作出響應,此時需要由AbstractTerminatableThread 的子類實現doTerminate 方法,在該方法中實現一些關閉目標執行緒所需的額外操作。例如,在Socket同步I/O中通過關閉socket使得使用該socket 的執行緒若處於I/O等待會丟擲SocketException 。因此,terminate方法下一步呼叫doTerminate 方法。接著,若terminationToken.reservations 的值作為非正數(表示目標執行緒無待處理任務或者我們不關心其是否有待處理任務),則terminate 方法會呼叫目標執行緒的interrupt 方法,強制目標執行緒的阻塞方法中斷,從而強制終止目標執行緒。

執行階段在AbstractTerminatableThread 的run方法中完成。該方法通過對TerminationToken 的toShutdown 屬性和reservations 屬性的判斷或者通過捕獲由interrupt 方法呼叫而丟擲的異常來終止執行緒,並在執行緒終止前呼叫由AbstractTerminatableThread 子類實現的doCleanup 方法用於執行一些清理動作。

在執行階段。由於AbstractTerminatableThread.run方法每次執行現場處理邏輯(通過呼叫doRun方法實現)前都先判斷下toShutDown屬性和reservations 的值,在目標執行緒處理完待處理的任務後(此時reservations 屬性的值為非整數)目標執行緒run方法也就退出了while迴圈。因此執行緒的處理邏輯方法將不再被呼叫,從而使本案例在不使用Two-phase Termination模式的情況下停止目標執行緒存在的兩個問題得以解決(目標執行緒停止前可以保證處理完待處理的任務-傳送佇列中現有的告警資訊到伺服器)和規避(目標執行緒傳送完佇列中現有的告警資訊後,doRun方法不再被呼叫,從而避免了佇列為空時 BlockingQueue.take 呼叫導致的阻塞)。

由上可知,準備階段、執行階段需要通過TerminationToken 作為“中介”來協調二者的動作。TerminationToken 的原始碼如清單5-4所示。

清單5-4 4 TerminationToken 類原始碼

/**
* 執行緒停止標誌
*
* @author  Viscent Huang
*/
public class TerminationToken {

    // 使用volatile修飾,以保證無須顯示鎖的情況下該變數的記憶體可見性
    protected volatile boolean         toShutdown   = false;

    public final AtomicInteger         reservations = new AtomicInteger(0);
    /***
    * 在多個可停止執行緒例項共享一個TerminationToken例項的情況下,
    * 該佇列用於記錄那些共享TerminationToken 例項的可停止執行緒,
    * 以便儘可能減少鎖的使用的情況下,實現這些執行緒的停止
    */
    private final Queue<WeakReference> coordinatedThreads;

    public TerminationToken() {
        coordinatedThreads = new ConcurrentLinkedQueue<WeakReference>();
    }

    public boolean isToShutdown() {
        return toShutdown;
    }

    protected void setToShutdown(boolean toShutdown) {
        this.toShutdown = true;
    }

    protected void register(Terminatable thread) {
        coordinatedThreads.add(new WeakReference(thread));
    }

    /**
    * 通知TerminationToken 例項:
    * 共享該例項的所有可停止執行緒中的一個執行緒停止了 以便其停止其他未被停止的執行緒
    *
    * @param thread 已停止的執行緒
    */
    protected void notifyThreadTermination(Terminatable thread) {
        WeakReference wrThread;
        Terminatable otherThread;
        while (null != (wrThread = coordinatedThreads.poll())) {
            otherThread = wrThread.get();
            if (null != otherThread && otherThread != thread) {
                otherThread.terminate();
            }
        }
    }

}

5.4  Two-phase Termination 模式的評價與實現考量

Two-phase Termination 模式使得我們可以對各種形式的目標執行緒進行優雅的停止。如目標執行緒呼叫了能夠對interrupt 方法呼叫作出響應的阻塞方法、目標執行緒呼叫了不能對interrupt 方法呼叫作出響應的阻塞方法、目標執行緒作為消費者處理其他執行緒生產的“產品”在其停止前需要處理完現有“產品”等。Two-phase Termination 模式實現的執行緒停止可能出現延遲,即客戶端程式碼呼叫完ThreadOwner.shutdown 後,該執行緒可能仍在執行。

本章案例展示了一個可複用的Two-phase Termination 模式實現程式碼。讀者若是要加深對該模式的理解或者自行實現該模式,需要注意以下幾個問題。

5.4.1執行緒停止標誌

本章案例使用了TerminationToken 作為目標執行緒可以準備停止的標誌。從清單5-4的程式碼我們可以看到,TerminationToken 使用了toShutdown 這個boolean 變數作為注意的停止標誌,而非使用Thread.isInterrupted()。這是因為,呼叫目標執行緒的interrupt方法無法保證目標執行緒的 isInterrupted() 方法返回值true:目標執行緒可能呼叫一些程式碼,它們捕獲InterruptedException 後沒有通過呼叫Thread.currentThread().interrupt()保留執行緒中斷狀態。另外,toShutdown 這個變數為了保證記憶體可見性而又能避免使用顯示鎖的開銷,採用了volatile修飾。這點也很重要,筆者曾經見過一些採用boolean變數作為執行緒停止標誌的程式碼,只是這些變數沒有用volatile修飾,對其訪問也沒有鎖,這就可能無法停止目標執行緒。

另外,某些場景下多個可停止執行緒例項可能需要共用一個執行緒停止標誌。例如,多個可停止執行緒例項“消耗”同一個佇列中的資料。當該佇列為空且不再有新的資料入佇列的時候,”消耗”該佇列資料的所有可停止執行緒都應該被停掉。AbstractTerminatableThread類(原始碼見清單5-3)的構造器支援傳入一個TerminationToken 例項就是為了支援這種場景。

5.4.2生產者-消費者問題中的執行緒停止

在多執行緒程式設計中,許多問題和一些多執行緒程式設計模式都可以看做生產者-消費者問題。停止處於生產者-消費者問題中的執行緒,需要考慮更多的問題:需要注意執行緒的停止順序。如果消費者執行緒比生產者執行緒先停止則會導致生產者生產新的”產品”無法被處理,而如果先停止生產者執行緒有可能使消費者執行緒處於空等待(如生產者、消費者採用阻塞佇列中轉”產品”)。並且,停止消費者執行緒是否考慮要等待其處理完所有待處理的任務或者將這些任務做個備份也是個問題。本章案例部分地展示生產者-消費者問題中執行緒停止的處理,其核心就是通過使用TerminationToken 的reservations 屬性:生產者每”生產”一個產品,Two-phase Termination 模式的客戶端程式碼要使reservations 屬性值增加1(即呼叫terminationToken.reservations.incrementAndGet());消費者執行緒每處理一個產品,該執行緒的執行緒處理邏輯方法 doRun 要使reservations 屬性值減少1(即呼叫terminationToken.reservations.decrementAndGet())。當然,在停止消費者執行緒時如果我們不關心其待處理的任務,Two-phase Termination模式的客戶端程式碼可以忽略對reservations 變數的操作。清單5-5展示了一個完整的停止生產者-消費者問題中的執行緒例子。

清單5-5 停止生產者-消費者問題中的執行緒的例子

public class SomeService {

    private final BlockingQueue queue    = new ArrayBlockingQueue(100);
    private final Producer      producer = new Producer();
    private final Consumer      consumer = new Consumer();

    private class Producer extends AbstractTerminatableThread {

        private int i = 0;

        @Override
        protected void doRun() throws Exception {
            queue.put(String.valueOf(i++));
            consumer.terminationToken.reservations.incrementAndGet();
        }
    };

    private class Consumer extends AbstractTerminatableThread {

        @Override
        protected void doRun() throws Exception {
            String product = queue.take();
            System.out.println("Processing product:" + product);
            // 模擬執行真正操作的時間消耗
            try {
                Thread.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {
                ;
            } finally {
                terminationToken.reservations.decrementAndGet();
            }
        }
    }

    public void shutdown() {
        // 生產者執行緒停止後再停止消費者執行緒
        producer.terminate(true);
        consumer.terminate();
    }

    public static void main(String[] args) throws InterruptedException {
        SomeService ss = new SomeService();
        ss.init();
        Thread.sleep(500);
        ss.shutdown();
    }

    public void init() {
        producer.start();
        consumer.start();
    }

    public static void main(String[] args) throws InterruptedException {
        SomeService ss = new SomeService();
        ss.init();
        Thread.sleep(500);
        ss.shutdown();
    }

}

5.4.3 隱藏而非暴露可停止的執行緒

為了保證可停止的執行緒不被其他程式碼誤止,一般我們將可停止的執行緒隱藏線上程擁有者背後,而 使系統中其他程式碼無法直接訪問該執行緒,正如本案例程式碼(見清單5-1)所展示:AlarmMgr定義了一個private欄位alarmSendingThread用於引用告警傳送執行緒(可停止的執行緒),系統中的其他程式碼只能通過呼叫AlarmMgr 的shutdown 方法來請求該執行緒停止,而非通過使用該執行緒物件來停止它。

5.5Two-phase Termination 模式的可複用實現程式碼

本章案例程式碼(見清單5-3、清單5-4)所實現的 Two-phase Termination 模式的幾個參與者AbstractTerminatableThread 和TerminationToken 都是可複用的。在此基礎上,應用程式碼只需要在定義AbstractTerminatableThread 的子類(或匿名類)時實現doRun方法,在該方法中實現執行緒的處理邏輯。另外,應用程式碼如果需要在目標執行緒處理完完待處理的的任務後再停止,則需要注意TerminationToken 例項的reservations 屬性值的增加和減少。

5.6 Java標準庫例項

類java.util.concurrent.ThreadPoolExecutor就使用了˹ Two-phase Termination 模式來停止其內部維護的工作者執行緒。當客戶端程式碼呼叫ThreadPoolExecutor 例項的shutdown方法請求其關閉時,ThreadPoolExecutor 會先將其執行狀態設定為 SHUTDOWN。工作者執行緒的run方法會判斷其所屬的ThreadPoolExecutor 例項的執行狀態。若ThreadPoolExecutor 例項的執行狀態為SHUTDOWN,則工作者執行緒會一直取工作佇列中的任務進行執行,知道工作佇列為空時該工作者執行緒就停止了。可見,ThreadPoolExecutor 例項的停止過程也是分為準備階段(設定其執行狀態為SHUTDOWN)和執行階段(工作者佇列取空工作佇列中的任務,然後終止執行緒)。

5.7 相關模式

Two-phase Termination 模式是一個應用比較廣泛的基礎多執行緒設計模式。凡是涉及應用自身實現執行緒的程式碼,都可能需要使用該模式。

5.7.1 Producer-Consumer 模式(第7章)

Producer-Consumer 模式中,生產者執行緒、消費者執行緒的停止可能需要使用 Two-phase Termination模式。

5.8  參考資源

1.Brian Göetz et al.Java Concurrency In Practice.Addison Wesley,2006.

2.Mark Grand. Patterns in Java, Volume 1: A Catalog of Reusable Design Patterns Illustrated with UML, Second Edition.Wiley, 2002.

3.類 ThreadPoolExecutor 原始碼. http://www.docjar.com/html/api/java/util/concurrent/ Thread PoolExecutor.java.html.