1. 程式人生 > >Java多執行緒程式設計模式實戰指南(三):Two-phase Termination模式

Java多執行緒程式設計模式實戰指南(三):Two-phase Termination模式



本文由本人首次釋出在infoq中文站上:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-two-phase-termination

停止執行緒是一個目標簡單而實現卻不那麼簡單的任務。首先,Java沒有提供直接的API用於停止執行緒。此外,停止執行緒時還有一些額外的細節需要考慮,如待停止的執行緒處於阻塞(等待鎖)或者等待狀態(等待其它執行緒)、尚有未處理完的任務等。本文介紹的Two-phase Termination模式提供了一種通用的用於優雅地停止執行緒的方法。

Two-phase Termination模式簡介

Java並沒有提供直接的API用於停止執行緒。Two-phase Termination模式通過將停止執行緒這個動作分解為準備階段和執行階段這兩個階段,以應對停止執行緒過程中可能存在的問題。

準備階段。該階段主要動作是“通知”目標執行緒(欲停止的執行緒)準備進行停止。這一步會設定一個標誌變數用於指示目標執行緒可以準備停止了。但是,由於目標執行緒可能正處於阻塞狀態(等待鎖的獲得)、等待狀態(如呼叫Object.wait)或者I/O(如InputStream.read)等待等狀態,即便設定了這個標誌,目標執行緒也無法立即“看到”這個標誌而做出相應動作。因此,這一階段還需要通過呼叫目標執行緒的interrupt方法,以期望目標執行緒能夠通過捕獲相關的異常偵測到該方法呼叫,從而中斷其阻塞狀態、等待狀態。對於能夠對interrupt方法呼叫作出響應的方法(參見表1),目標執行緒程式碼可以通過捕獲這些方法丟擲的InterruptedException來偵測執行緒停止訊號。但也有一些方法(如InputStream.read)並不對interrupt呼叫作出響應,此時需要我們手工處理,如同步的Socket I/O操作中通過關閉socket,使處於I/O等待的socket丟擲java.net.SocketException。

表 1. 能夠對Thread.interrupt作出響應的一些方法

方法

響應interrupt呼叫丟擲的異常

Object.wait() 、 Object.wait(long timeout) 、Object.wait(long timeout, int nanos)

InterruptedException

Thread.sleep(long millis) 、Thread.sleep(long millis, int nanos)

InterruptedException

Thread.join()、Thread.join(long millis) 、Thread.join(long millis, int nanos)

InterruptedException

java.util.concurrent.BlockingQueue.take()

InterruptedException

java.util.concurrent.locks.Lock.lockInterruptibly()

InterruptedException

java.nio.channels.InterruptibleChannel

java.nio.channels.ClosedByInterruptException

執行階段。該階段的主要動作是檢查準備階段所設定的執行緒停止標誌和訊號,在此基礎上決定執行緒停止的時機,並進行適當的“清理”操作。

Two-phase Termination模式的架構

Two-phase Termination模式的主要參與者有以下幾種。其類圖如圖1所示。

圖 1. Two-phase Termination模式的類圖

  • ThreadOwner:目標執行緒的擁有者。Java語言中,並沒有執行緒的擁有者的概念,但是執行緒的背後是其要處理的任務或者其所提供的服務,因此我們不能在不清楚某個執行緒具體是做什麼的情況下貿然將其停止。一般地,我們可以將目標執行緒的建立者視為該執行緒的擁有者,並假定其“知道”目標執行緒的工作內容,可以安全地停止目標執行緒。
  • TerminatableThread:可停止的執行緒。其主要方法及職責如下:
    • terminate:設定執行緒停止標誌,併發送停止“訊號”給目標執行緒。
    • doTerminate:留給子類實現執行緒停止時所需的一些額外操作,如目標執行緒程式碼中包含Socket I/O,子類可以在該方法中關閉Socket以達到快速停止執行緒,而不會使目標執行緒等待I/O完成才能偵測到執行緒停止標記。
    • doRun:留給子類實現執行緒的處理邏輯。相當於Thread.run,只不過該方法中無需關心停止執行緒的邏輯,因為這個邏輯已經被封裝在TerminatableThread的run方法中了。
    • doCleanup:留給子類實現執行緒停止後可能需要的一些清理動作。
  • TerminationToken:執行緒停止標誌。toShutdown用於指示目標執行緒可以停止了。reservations可用於反映目標執行緒還有多少數量未完成的任務,以支援等目標執行緒處理完其任務後再行停止。

準備階段的序列圖如圖2所示:

圖 2. 準備階段的序列圖

1、客戶端程式碼呼叫執行緒擁有者的shutdown方法。

2、shutdown方法呼叫目標執行緒的terminate方法。

3~4、terminate方法將terminationToken的toShutdown標誌設定為true。

5、terminate方法呼叫由TerminatableThread子類實現的doTerminate方法,使得子類可以為停止目標執行緒做一些其它必要的操作。

6、若terminationToken的reservations屬性值為0,則表示目標執行緒沒有未處理完的任務或者ThreadOwner在停止執行緒時不關心其是否有未處理的任務。此時,terminate方法會呼叫目標執行緒的interrupt方法。

7、terminate方法呼叫結束。

8、shutdown呼叫返回,此時目標執行緒可能還仍然在執行。

執行階段由目標執行緒的程式碼去檢查terminationToken的toShutdown屬性、reservations屬性的值,並捕獲由interrupt方法呼叫丟擲的相關異常以決定是否停止執行緒。線上程停止前由TerminatableThread子類實現的doCleanup方法會被呼叫。

Two-phase Termination模式實戰案例

某系統需要對接告警系統以實現告警功能。告警系統是一個C/S結構的系統,它提供了一套客戶端API(AlarmAgent)用於與其對接的系統給其傳送告警。該系統將告警功能封裝在一個名為AlarmMgr的單件類(Singleton)中,系統中其它程式碼需要傳送告警的只需要呼叫該類的sendAlarm方法。該方法將告警資訊快取入佇列,由專門的告警傳送執行緒負責呼叫AlarmAgent的相關方法將告警資訊傳送至告警伺服器。

告警傳送執行緒是一個使用者執行緒(User Thread),因此在系統的停止過程中,該執行緒若未停止則會阻止JVM正常關閉。所以,在系統停止過程中我們必須主動去停止告警傳送執行緒,而非依賴JVM。為了能夠儘可能快的以優雅的方式將告警傳送執行緒停止,我們需要處理以下兩個問題:

  1. 當告警快取佇列非空時,需要將佇列中已有的告警資訊傳送至告警伺服器。
  2. 由於快取告警資訊的佇列是一個阻塞佇列(LinkedBlockingQueue),在該佇列為空的情況下,告警傳送執行緒會一直處於等待狀態。這會導致其無法響應我們的關閉執行緒的請求。

上述問題可以通過使用Two-phase Termination模式來解決。

AlarmMgr相當於圖1中的ThreadOwner參與者例項,它是告警傳送執行緒的擁有者。系統停止過程中呼叫其shutdown方法(AlarmMgr.getInstance().shutdown())即可請求告警傳送執行緒停止。其程式碼如清單1所示:

清單 1. AlarmMgr原始碼

public class AlarmMgr {
	private final BlockingQueue<AlarmInfo> alarms = new LinkedBlockingQueue<AlarmInfo>();
	//告警系統客戶端API
	private final AlarmAgent alarmAgent = new AlarmAgent();
	//告警傳送執行緒
	private final AbstractTerminatableThread alarmSendingThread;

	private boolean shutdownRequested = false;

	private static final AlarmMgr INSTANCE = new AlarmMgr();

	private AlarmMgr() {
		alarmSendingThread = new AbstractTerminatableThread() {
			@Override
			protected void doRun() throws Exception {
				if (alarmAgent.waitUntilConnected()) {
					AlarmInfo alarm;
					alarm = alarms.take();
					terminationToken.reservations.decrementAndGet();
					try {
						alarmAgent.sendAlarm(alarm);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}

			@Override
			protected void doCleanup(Exception exp) {
				if (null != exp) {
					exp.printStackTrace();
				}
				alarmAgent.disconnect();
			}

		};

		alarmAgent.init();
	}

	public static AlarmMgr getInstance() {
		return INSTANCE;
	}

	public void sendAlarm(AlarmType type, String id, String extraInfo) {
		final TerminationToken terminationToken = alarmSendingThread.terminationToken;
		if (terminationToken.isToShutdown()) {
			// log the alarm
			System.err.println("rejected alarm:" + id + "," + extraInfo);
			return;

		}
		try {
			AlarmInfo alarm = new AlarmInfo(id, type);
			alarm.setExtraInfo(extraInfo);
			terminationToken.reservations.incrementAndGet();
			alarms.add(alarm);
		} catch (Throwable t) {
			t.printStackTrace();
		}
	}

	public void init() {
		alarmSendingThread.start();
	}

	public synchronized void shutdown() {
		if (shutdownRequested) {
			throw new IllegalStateException("shutdown already requested!");
		}
		
		alarmSendingThread.terminate();
		shutdownRequested = true;
	}

	public int pendingAlarms() {
		return alarmSendingThread.terminationToken.reservations.get();
	}

}

class AlarmAgent {
	// 省略其它程式碼
	private volatile boolean connectedToServer = false;

	public void sendAlarm(AlarmInfo alarm) throws Exception {
		// 省略其它程式碼
		System.out.println("Sending " + alarm);
		try {
			Thread.sleep(50);
		} catch (Exception e) {

		}
	}

	public void init() {
		// 省略其它程式碼
		connectedToServer = true;
	}

	public void disconnect() {
		// 省略其它程式碼
		System.out.println("disconnected from alarm server.");
	}

	public boolean waitUntilConnected() {
		// 省略其它程式碼
		return connectedToServer;
	}
}

從上面的程式碼可以看出,AlarmMgr每接受一個告警資訊放入快取佇列便將terminationToken的reservations值增加1,而告警傳送執行緒每傳送一個告警到告警伺服器則將terminationToken的reservations值減少1。這為我們可以在停止告警傳送執行緒前確保佇列中現有的告警資訊會被處理完畢提供了線索:AbstractTerminatableThread的run方法會根據terminationToken的reservations是否為0來判斷待停止的執行緒已無未處理的任務,或者無需關心其是否有待處理的任務。

AbstractTerminatableThread的原始碼見清單2:

清單 2. AbstractTerminatableThread原始碼

public abstract class AbstractTerminatableThread extends Thread 
     implements Terminatable {
	public final TerminationToken terminationToken;

	public AbstractTerminatableThread() {
		super();
		this.terminationToken = new TerminationToken();
	}

      /**
	 * 
	 * @param terminationToken 執行緒間共享的執行緒終止標誌例項
	*/
	public AbstractTerminatableThread(TerminationToken terminationToken) {
		super();
		this.terminationToken = terminationToken;
	}

	protected abstract void doRun() throws Exception;

	protected void doCleanup(Exception cause) {}

	protected void doTerminiate() {}

	@Override
	public void run() {
		Exception ex = null;
		try {
			while (true) {
                                    /*
				 * 在執行執行緒的處理邏輯前先判斷執行緒停止的標誌。
				 */
				if (terminationToken.isToShutdown()
				    && terminationToken.reservations.get() <= 0) {
					break;
				}
				doRun();
			}

		} catch (Exception e) {
			// Allow the thread to terminate in response of a interrupt invocation
			ex = e;
		} finally {
			doCleanup(ex);
		}
	}

	@Override
	public void interrupt() {
		terminate();
	}

	@Override
	public void terminate() {
		terminationToken.setToShutdown(true);
		try {
			doTerminiate();
		} finally {
			// 若無待處理的任務,則試圖強制終止執行緒
			if (terminationToken.reservations.get() <= 0) {
				super.interrupt();
			}
		}
	}

}

AbstractTerminatableThread是一個可複用的TerminatableThread參與者例項。其terminate方法完成了執行緒停止的準備階段。該方法首先將terminationToken的toShutdown變數設定為true,指示目標執行緒可以準備停止了。但是,此時目標執行緒可能處於一些阻塞(Blocking)方法的呼叫,如呼叫Object.sleep、InputStream.read等,無法偵測到該變數。呼叫目標執行緒的interrupt方法可以使一些阻塞方法(參見表1)通過丟擲異常從而使目標執行緒停止。但也有些阻塞方法如InputStream.read並不對interrupt方法呼叫作出響應,此時需要由TerminatableThread的子類實現doTerminiate方法,在該方法中實現一些關閉目標執行緒所需的額外操作。例如,在Socket同步I/O中通過關閉socket使得使用該socket的執行緒若處於I/O等待會丟擲SocketException。因此,terminate方法下一步呼叫doTerminate方法。接著,若terminationToken.reservations的值為非正數(表示目標執行緒無待處理任務、或者我們不關心其是否有待處理任務),則terminate方法會呼叫目標執行緒的interrupt方法,強制目標執行緒的阻塞方法中斷,從而強制終止目標執行緒。

執行階段在AbstractTerminatableThread的run方法中完成。該方法通過對TerminationToken的toShutdown屬性和reservations屬性的判斷或者通過捕獲由interrupt方法呼叫而丟擲的異常來終止執行緒。並在執行緒終止前呼叫由TerminatableThread子類實現的doCleanup方法用於執行一些清理動作。

在執行階段,由於AbstractTerminatableThread.run方法每次執行執行緒處理邏輯(通過呼叫doRun方法實現)前都先判斷下toShutdown屬性和reservations屬性的值,在目標執行緒處理完其待處理的任務後(此時reservations屬性的值為非正數)目標執行緒run方法也就退出了while迴圈。因此,執行緒的處理邏輯程式碼(doRun方法)將不再被呼叫,從而使本案例在不使用Two-phase Termination模式的情況下停止目標執行緒存在的兩個問題得以解決(目標執行緒停止前可以保證其處理完待處理的任務——傳送佇列中現有的告警資訊到伺服器)和規避(目標執行緒傳送完佇列中現有的告警資訊後,doRun方法不再被呼叫,從而避免了佇列為空時BlockingQueue.take呼叫導致的阻塞)。

從上可知,準備階段、執行階段需要通過TerminationToken作為“中介”來協調二者的動作。TerminationToken的原始碼如清單3所示:

清單 3. TerminationToken原始碼

public class TerminationToken {
        //使用volatile修飾,以保證無需顯示鎖的情況下該變數的記憶體可見性
	protected volatile boolean toShutdown = false;
	public final AtomicInteger reservations = new AtomicInteger(0);

	public boolean isToShutdown() {
		return toShutdown;
	}

	protected void setToShutdown(boolean toShutdown) {
		this.toShutdown = true;
	}

}

Two-phase Termination模式的評價與實現考量

Two-phase Termination模式使得我們可以對各種形式的目標執行緒進行優雅的停止。如目標執行緒呼叫了能夠對interrupt方法呼叫作出響應的阻塞方法、目標執行緒呼叫了不能對interrupt方法呼叫作出響應的阻塞方法、目標執行緒作為消費者處理其它執行緒生產的“產品”在其停止前需要處理完現有“產品”等。Two-phase Termination模式實現的執行緒停止可能出現延遲,即客戶端程式碼呼叫完ThreadOwner.shutdown後,該執行緒可能仍在執行。

本文案例展示了一個可複用的Two-phase Termination模式實現程式碼。讀者若要自行實現該模式,可能需要注意以下幾個問題。

執行緒停止標誌

本文案例使用了TerminationToken作為目標執行緒可以準備停止的標誌。從清單3的程式碼我們可以看到,TerminationToken使用了toShutdown這個boolean變數作為主要的停止標誌,而非使用Thread.isInterrupted()。這是因為,呼叫目標執行緒的interrupt方法無法保證目標執行緒的isInterrupted()方法返回值為true:目標執行緒可能呼叫一些能夠捕獲InterruptedException而不保留執行緒中斷狀態的程式碼。另外,toShutdown這個變數為了保證記憶體可見性而又能避免使用顯式鎖的開銷,採用了volatile修飾。這點也很重要,筆者曾經見過一些採用boolean變數作為執行緒停止標誌的程式碼,只是這些變數沒有用volatile修飾,對其訪問也沒有加鎖,這就可能無法停止目標執行緒。

生產者——消費者問題中的執行緒停止

在多執行緒程式設計中,許多問題和一些多執行緒程式設計模式都可以看作生產者——消費者問題。停止處於生產者——消費者問題中的執行緒,需要考慮更多的問題:需要注意執行緒的停止順序,如果消費者執行緒比生產者執行緒先停止則會導致生產者生產的新”產品“無法被處理,而如果先停止生產者執行緒又可能使消費者執行緒處於空等待(如生產者消費者採用阻塞佇列中轉”產品“)。並且,停止消費者執行緒前是否考慮要等待其處理完所有待處理的任務或者將這些任務做個備份也是個問題。本文案例部分地展示生產者——消費者問題中執行緒停止的處理,其核心就是通過使用TerminationToken的reservations變數:生產者每”生產“一個產品,Two-phase Termination模式的呼叫方程式碼要使reservations變數值增加1(terminationToken.reservations.incrementAndGet());消費者執行緒每處理一個產品,Two-phase Termination模式的呼叫方程式碼要使reservations變數值減少1(terminationToken.reservations.decrementAndGet())。當然,在停止消費者執行緒時如果我們不關心其待處理的任務,Two-phase Termination模式的呼叫方程式碼可以忽略對reservations變數的操作。清單4展示了一個完整的停止生產者——消費者問題中的執行緒的例子:

清單 4. 停止生產者——消費者問題中的執行緒的例子

public class ProducerConsumerStop {
	class SampleConsumer<P> {
		private final BlockingQueue<P> queue = new LinkedBlockingQueue<P>();

		private AbstractTerminatableThread workThread 
				= new AbstractTerminatableThread() {
			@Override
			protected void doRun() throws Exception {
				terminationToken.reservations.decrementAndGet();
				P product = queue.take();
				// ...
				System.out.println(product);
			}

		};

		public void placeProduct(P product) {
			if (workThread.terminationToken.isToShutdown()) {
				throw new IllegalStateException("Thread shutdown");
			}
			try {
				queue.put(product);
				workThread.terminationToken.reservations.incrementAndGet();
			} catch (InterruptedException e) {

			}
		}

		public void shutdown() {
			workThread.terminate();
		}

		public void start() {
			workThread.start();
		}
	}

	public void test() {
		final SampleConsumer<String> aConsumer = new SampleConsumer<String>();

		AbstractTerminatableThread aProducer = new AbstractTerminatableThread() {
			private int i = 0;

			@Override
			protected void doRun() throws Exception {
				aConsumer.placeProduct(String.valueOf(i));
			}

			@Override
			protected void doCleanup(Exception cause) {
				// 生產者執行緒停止完畢後再請求停止消費者執行緒
				aConsumer.shutdown();
			}

		};

		aProducer.start();
		aConsumer.start();
	}
}

隱藏而非暴露可停止的執行緒

為了保證可停止的執行緒不被其它程式碼誤停止,一般我們將可停止執行緒隱藏線上程擁有者背後,而使系統中其它程式碼無法直接訪問該執行緒,正如本案例程式碼(見清單1)所展示:AlarmMgr定義了一個private欄位alarmSendingThread用於引用告警傳送執行緒(可停止的執行緒),系統中的其它程式碼只能通過呼叫AlarmMgr的shutdown方法來請求該執行緒停止,而非通過引用該執行緒物件自身來停止它。

總結

本文介紹了Two-phase Termination模式的意圖及架構。並結合筆者工作經歷提供了一個實際的案例用於展示一個可複用的Two-phase Termination模式實現程式碼,在此基礎上對該模式進行了評價並分享在實際運用該模式時需要注意的事項。

參考資源

  • 本文的原始碼線上閱讀:https://github.com/Viscent/JavaConcurrencyPattern/
  • Brian Göetz et al.,Java Concurrency In Practice
  • Mark Grand,Patterns in Java,Volume 1, 2nd Edition