1. 程式人生 > >執行緒同步工具(五)執行階段性併發任務

執行緒同步工具(五)執行階段性併發任務

宣告:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 譯者:鄭玉婷

執行階段性併發任務

Java 併發 API 提供的一個非常複雜且強大的功能是,能夠使用Phaser類執行階段性的併發任務。當某些併發任務是分成多個步驟來執行時,那麼此機制是非常有用的。Phaser類提供的機制是在每個步驟的結尾同步執行緒,所以除非全部執行緒完成第一個步驟,否則執行緒不能開始進行第二步。

相對於其他同步應用,我們必須初始化Phaser類與這次同步操作有關的任務數,我們可以通過增加或者減少來不斷的改變這個數。

在這個指南,你將學習如果使用Phaser類來同步3個併發任務。這3個任務會在3個不同的資料夾和它們的子資料夾中搜索副檔名是.log並在24小時內修改過的檔案。這個任務被分成3個步驟:

1. 在指定的資料夾和子資料夾中獲得副檔名為.log的檔案列表。
2. 過濾第一步的列表中修改超過24小時的檔案。
3. 在操控臺列印結果。

在步驟1和步驟2的結尾我們要檢查列表是否為空。如果為空,那麼執行緒直接結束執行並從phaser類中淘汰。

準備

指南中的例子是使用Eclipse IDE 來實現的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 開啟並建立一個新的java任務。

怎麼做呢

按照這些步驟來實現下面的例子:

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

//1.   建立一個類名為FileSearch並一定實現Runnable 介面。這個類實現的操作是在資料夾和其子資料夾中搜索確定的副檔名並在24小時內修改的檔案。
public class FileSearch implements Runnable {

	// 2. 宣告一個私有 String 屬性來儲存搜尋開始的時候的資料夾。
	private String initPath;

	// 3. 宣告另一個私有 String 屬性來儲存我們要尋找的檔案的副檔名。
	private String end;

	// 4. 宣告一個私有 List 屬性來儲存我們找到的符合條件的檔案的路徑。
	private List<String> results;

	// 5. 最後,宣告一個私有 Phaser 屬性來控制任務的不同phaser的同步。
	private Phaser phaser;

	// 6. 實現類的建構函式,初始化類的屬性們。它接收初始資料夾的路徑,檔案的副檔名,和phaser 作為引數。
	public FileSearch(String initPath, String end, Phaser phaser) {
		this.initPath = initPath;
		this.end = end;
		this.phaser = phaser;
		results = new ArrayList<String>();
	}

	// 7. 現在,你必須實現一些要給run() 方法用的輔助方法。第一個是 directoryProcess()
	// 方法。它接收File物件作為引數並處理全部的檔案和子資料夾。對於每個資料夾,此方法會遞迴呼叫並傳遞資料夾作為引數。對於每個檔案,此方法會呼叫fileProcess()
	// 方法。
	private void directoryProcess(File file) {

		File list[] = file.listFiles();
		if (list != null) {
			for (int i = 0; i < list.length; i++) {

				if (list[i].isDirectory()) {
					directoryProcess(list[i]);
				} else {
					fileProcess(list[i]);
				}
			}
		}
	}

	// 8. 現在,實現 fileProcess() 方法。它接收 File
	// 物件作為引數並檢查它的副檔名是否是我們正在查詢的。如果是,此方法會把檔案的絕對路徑寫入結果列表內。
	private void fileProcess(File file) {
		if (file.getName().endsWith(end)) {
			results.add(file.getAbsolutePath());
		}
	}

	// 9. 現在,實現 filterResults()
	// 方法。不接收任何引數。它過濾在第一階段獲得的檔案列表,並刪除修改超過24小時的檔案。首先,建立一個新的空list並獲得當前時間。
	private void filterResults() {
		List<String> newResults = new ArrayList<String>();
		long actualDate = new Date().getTime();

		// 10. 然後,瀏覽結果list裡的所有元素。對於每個路徑,為檔案建立File物件 go through all the elements
		// of the results list. For each path in the list of results, create a
		// File object for that file and get the last modified date for it.
		for (int i = 0; i < results.size(); i++) {
			File file = new File(results.get(i));
			long fileDate = file.lastModified();

			// 11. 然後, 對比與真實日期對比,如果相差小於一天,把檔案的路徑加入到新的結果列表。
			if (actualDate - fileDate < TimeUnit.MILLISECONDS.convert(1,
					TimeUnit.DAYS)) {
				newResults.add(results.get(i));
			}
		}

		// 12. 最後,把舊的結果改為新的。
		results = newResults;
	}

	// 13. 現在,實現 checkResults() 方法。此方法在第一個和第二個phase的結尾被呼叫,並檢查結果是否為空。此方法不接收任何引數。
	private boolean checkResults() {

		// 14. 首先,檢查結果List的大小。如果為0,物件寫資訊到操控臺表明情況,然後呼叫Phaser物件的
		// arriveAndDeregister() 方法通知此執行緒已經結束actual phase,並離開phased操作。
		if (results.isEmpty()) {
			System.out.printf("%s: Phase %d: 0 results.\n", Thread
					.currentThread().getName(), phaser.getPhase());
			System.out.printf("%s: Phase %d: End.\n", Thread.currentThread()
					.getName(), phaser.getPhase());
			phaser.arriveAndDeregister();
			return false;

			// 15. 另一種情況,如果結果list有元素,那麼物件寫資訊到操控臺表明情況,呼叫 Phaser物件懂得
			// arriveAndAwaitAdvance() 方法並通知 actual phase,它會被阻塞直到phased
			// 操作的全部參與執行緒結束actual phase。

		} else {
			System.out.printf("%s: Phase %d: %d results.\n", Thread
					.currentThread().getName(), phaser.getPhase(), results
					.size());
			phaser.arriveAndAwaitAdvance();
			return true;
		}
	}

	// 16. 最好一個輔助方法是 showInfo() 方法,列印results list 的元素到操控臺。
	private void showInfo() {
		for (int i = 0; i < results.size(); i++) {
			File file = new File(results.get(i));
			System.out.printf("%s: %s\n", Thread.currentThread().getName(),
					file.getAbsolutePath());
		}
		phaser.arriveAndAwaitAdvance();
	}

	// 17. 現在,來實現 run() 方法,使用之前描述的輔助方法來執行,並使用Phaser物件控制phases間的改變。首先,呼叫phaser物件的
	// arriveAndAwaitAdvance() 方法。直到使用執行緒被建立完成,搜尋行為才會開始。
	@Override
	public void run() {

		phaser.arriveAndAwaitAdvance();

		// 18. 然後,寫資訊到操控臺表明搜尋任務開始。

		System.out.printf("%s: Starting.\n", Thread.currentThread().getName());

		// 19. 檢視 initPath 屬性儲存的資料夾名字並使用 directoryProcess()
		// 方法在資料夾和其子資料夾內查詢帶特殊副檔名的檔案。
		File file = new File(initPath);
		if (file.isDirectory()) {
			directoryProcess(file);
		}

		// 20. 使用 checkResults() 方法檢查是否有結果。如果沒有任何結果,結束執行緒的執行並返回keyword。
		if (!checkResults()) {
			return;
		}

		// 21. 使用filterResults() 方法過濾結果list。
		filterResults();

		// 22. 再次使用checkResults() 方法檢查是否有結果。如果沒有,結束執行緒的執行並返回keyword。
		if (!checkResults()) {
			return;
		}

		// 23. 使用 showInfo() 方法列印最終的結果list到操控臺,撤銷執行緒的登記,並列印資訊表明執行緒的終結。
		showInfo();
		phaser.arriveAndDeregister();
		System.out.printf("%s: Work completed.\n", Thread.currentThread()
				.getName());

	}
}

// 24. 現在,實現例子的main 類通過建立類名為 Main 併為其新增 main() 方法。

class Main {

	public static void main(String[] args) {

		// 25. 建立 含3個參與者的 Phaser 物件。
		Phaser phaser = new Phaser(3);

		// 26. 建立3個 FileSearch 物件,每個在不同的初始資料夾裡搜尋.log副檔名的檔案。
		FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
		FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
		FileSearch documents = new FileSearch("C:\\Documents And Settings",
				"log", phaser);

		// 27. 建立並開始一個執行緒來執行第一個 FileSearch 物件。
		Thread systemThread = new Thread(system, "System");
		systemThread.start();

		// 28. 建立並開始一個執行緒來執行第二個 FileSearch 物件。
		Thread appsThread = new Thread(apps, "Apps");
		appsThread.start();

		// 29. 建立並開始一個執行緒來執行第三個 FileSearch 物件。
		Thread documentsThread = new Thread(documents, "Documents");
		documentsThread.start();

		// 30. 等待3個執行緒們的終結。

		try {
			systemThread.join();
			appsThread.join();
			documentsThread.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		// 31. 使用isFinalized()方法把Phaser物件的結束標誌值寫入操控臺。
		System.out.println("Terminated: " + phaser.isTerminated());
	}
}

它是怎麼工作的…

這程式開始建立的 Phaser 物件是用來在每個phase的末端控制執行緒的同步。Phaser的建構函式接收參與者的數量作為引數。在這裡,Phaser有3個參與者。這個數向Phaser表示 Phaser改變phase之前執行 arriveAndAwaitAdvance() 方法的執行緒數,並叫醒正在休眠的執行緒。

一旦Phaser被建立,我們執行3個執行緒分別執行3個不同的FileSearch物件。

在例子裡,我們使用 Windows operating system 的路徑。如果你使用的是其他作業系統,那麼修改成適應你的環境的路徑。

FileSearch物件的 run() 方法中的第一個指令是呼叫Phaser物件的 arriveAndAwaitAdvance() 方法。像之前提到的,Phaser知道我們要同步的執行緒的數量。當某個執行緒呼叫此方法,Phaser減少終結actual phase的執行緒數,並讓這個執行緒進入休眠 直到全部其餘執行緒結束phase。在run() 方法前面呼叫此方法,沒有任何 FileSearch 執行緒可以開始他們的工作,直到全部執行緒被建立。

在phase 1 和 phase 2 的末端,我們檢查phase 是否生成有元素的結果list,或者它沒有生成結果且list為空。在第一個情況,checkResults() 方法之前提的呼叫 arriveAndAwaitAdvance()。在第二個情況,如果list為空,那就沒有必要讓執行緒繼續了,就直接返回吧。但是你必須通知phaser,將會少一個參與者。為了這個,我們使用arriveAndDeregister()。它通知phaser執行緒結束了actual phase, 但是它將不會繼續參見後面的phases,所以請phaser不要再等待它了。

在phase3的結尾實現了 showInfo() 方法, 呼叫了 phaser 的 arriveAndAwaitAdvance() 方法。這個呼叫,保證了全部執行緒在同一時間結束。當此方法結束執行,有一個呼叫phaser的arriveAndDeregister() 方法。這個呼叫,我們撤銷了對phaser執行緒的註冊,所以當全部執行緒結束時,phaser 有0個參與者。

最後,main() 方法等待3個執行緒的完成並呼叫phaser的 isTerminated() 方法。當phaser 有0個參與者時,它進入termination狀態,此狀態與此呼叫將會列印true到操控臺。

Phaser 物件可能是在這2中狀態:

  1. Active: 當 Phaser 接受新的參與者註冊,它進入這個狀態,並且在每個phase的末端同步。 在此狀態,Phaser像在這個指南里解釋的那樣工作。此狀態不在Java 併發 API中。
  2. Termination: 預設狀態,當Phaser裡全部的參與者都取消註冊,它進入這個狀態,所以這時 Phaser 有0個參與者。更具體的說,當onAdvance() 方法返回真值時,Phaser 是在這個狀態裡。如果你覆蓋那個方法,你可以改變它的預設行為。當 Phaser 在這個狀態,同步方法 arriveAndAwaitAdvance()會 立刻返回,不會做任何同步。

Phaser 類的一個顯著特點是你不需要控制任何與phaser相關的方法的異常。不像其他同步應用,執行緒們在phaser休眠不會響應任何中斷也不會丟擲 InterruptedException 異常。只有一個異常會在下面的‘更多’裡解釋。

下面的裁圖是例子的執行結果:

它展示了前2個phases的執行。你可以發現Apps執行緒在phase 2 結束它的執行由於list 為空。當你執行例子,你會發現一些執行緒比其他的執行緒更快結束phase,但是他們必須等待其他全部結束然後才能繼續。

更多…

The Phaser類還提供了其他相關方法來改變phase。他們是:

  • arrive(): 此方法示意phaser某個參與者已經結束actual phase了,但是他應該等待其他的參與者才能繼續執行。小心使用此法,因為它並不能與其他執行緒同步。
  • awaitAdvance(int phase): 如果我們傳遞的引數值等於phaser的actual phase,此方法讓當前執行緒進入睡眠直到phaser的全部參與者結束當前的phase。如果引數值與phaser 的 actual phase不等,那麼立刻返回。
  • awaitAdvanceInterruptibly(int phaser): 此方法等同與之前的方法,只是在執行緒正在此方法中休眠而被中斷時候,它會丟擲InterruptedException 異常。

Phaser的參與者的註冊
當你建立一個 Phaser 物件,你表明了參與者的數量。但是Phaser類還有2種方法來增加參與者的數量。他們是:

  • register(): 此方法為Phaser新增一個新的參與者。這個新加入者會被認為是還未到達 actual phase.
  • bulkRegister(int Parties): 此方法為Phaser新增一個特定數量的參與者。這些新加入的參與都會被認為是還未到達 actual phase.

Phaser類提供的唯一一個減少參與者數量的方法是arriveAndDeregister() 方法,它通知phaser執行緒已經結束了actual phase,而且他不想繼續phased的操作了。

強制終止 Phaser
當phaser有0個參與者,它進入一個稱為Termination的狀態。Phaser 類提供 forceTermination() 來改變phaser的狀態,讓它直接進入Termination 狀態,不在乎已經在phaser中註冊的參與者的數量。此機制可能會很有用在一個參與者出現異常的情況下來強制結束phaser.

當phaser在 Termination 狀態, awaitAdvance() 和 arriveAndAwaitAdvance() 方法立刻返回一個負值,而不是一般情況下的正值如果你知道你的phaser可能終止了,那麼你可以用這些方法來確認他是否真的終止了。

參見

第八章,測試併發應用:檢測Phaser