1. 程式人生 > >測試併發應用(四)監控Fork/Join池

測試併發應用(四)監控Fork/Join池

宣告:本文是《 Java 7 Concurrency Cookbook 》的第八章, 作者: Javier Fernández González 譯者:鄭玉婷

監控Fork/Join池

Executor 框架提供了執行緒的建立和管理,來實現任務的執行機制。Java 7 包括了一個 Executor 框架的延伸為了一種特別的問題而使用的,將比其他解決方案的表現有所提升(可以直接使用 Thread 物件或者 Executor 框架)。它是 Fork/Join 框架。

此框架是為了解決可以使用 divide 和 conquer 技術,使用 fork() 和 join() 操作把任務分成小塊的問題而設計的。主要的類實現這個行為的是 ForkJoinPool 類。

在這個指南,你將學習從ForkJoinPool類可以獲取的資訊和如何獲取這些資訊。

準備

指南中的例子是使用Eclipse IDE 來實現的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 開啟並建立一個新的java任務。

怎麼做呢…

按照這些步驟來實現下面的例子:

//1.   建立一個類,名為 Task, 擴充套件 RecursiveAction 類。

public class Task extends RecursiveAction {

	// 2. 宣告一個私有 int array 屬性,名為 array,用來儲存你要增加的 array 的元素。
	private int[] array;

	// 3. 宣告2個私有 int 屬性,名為 start 和 end,用來儲存 此任務已經處理的元素塊的頭和尾的位置。
	private int start;
	private int end;

	// 4. 實現類的建構函式,初始化屬性值。
	public Task(int array[], int start, int end) {
		this.array = array;
		this.start = start;
		this.end = end;
	}

	// 5. 用任務的中心邏輯來實現 compute()
	// 方法。如果此任務已經處理了超過100任務,那麼把元素集分成2部分,再建立2個任務分別來執行這些部分,使用 fork() 方法開始執行,並使用
	// join() 方法等待它的終結。
	protected void compute() {
		if (end - start > 100) {
			int mid = (start + end) / 2;
			Task task1 = new Task(array, start, mid);
			Task task2 = new Task(array, mid, end);

			task1.fork();
			task2.fork();

			task1.join();
			task2.join();

			// 6. 如果任務已經處理了100個元素或者更少,那麼在每次操作之後讓執行緒進入休眠5毫秒來增加元素。
		} else {
			for (int i = start; i < end; i++) {
				array[i]++;

				try {
					Thread.sleep(5);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

//7. 建立例子的主類通過建立一個類,名為 Main 並新增 main()方法。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

	public static void main(String[] args) throws Exception {

		// 8. 建立 ForkJoinPool 物件,名為 pool。
		ForkJoinPool pool = new ForkJoinPool();

		// 9. 建立 10,000個元素的整數 array ,名為 array。
		int array[] = new int[10000];

		// 10. 建立新的 Task 物件來處理整個array。
		Task task1 = new Task(array, 0, array.length);

		// 11. 使用 execute() 方法 把任務傳送到pool裡執行。
		pool.execute(task1);

		// 12. 當任務還未結束執行,呼叫 showLog() 方法來把 ForkJoinPool 類的狀態資訊寫入,然後讓執行緒休眠一秒。
		while (!task1.isDone()) {
			showLog(pool);
			TimeUnit.SECONDS.sleep(1);
		}

		// 13. 使用 shutdown() 方法關閉pool。
		pool.shutdown();

		// 14. 使用 awaitTermination() 方法 等待pool的終結。
		pool.awaitTermination(1, TimeUnit.DAYS);

		// 15. 呼叫 showLog() 方法寫關於 ForkJoinPool 類狀態的資訊並寫一條資訊到操控臺表明結束程式。
		showLog(pool);
		System.out.printf("Main: End of the program.\n");
	}

	// 16. 實現 showLog() 方法。它接收 ForkJoinPool 物件作為引數和寫關於執行緒和任務的執行的狀態的資訊。
	private static void showLog(ForkJoinPool pool) {
		System.out.printf("**********************\n");
		System.out.printf("Main: Fork/Join Pool log\n");
		System.out.printf("Main: Fork/Join Pool: Parallelism:%d\n",
				pool.getParallelism());
		System.out.printf("Main: Fork/Join Pool: Pool Size:%d\n",
				pool.getPoolSize());
		System.out.printf("Main: Fork/Join Pool: Active Thread Count:%d\n",
				pool.getActiveThreadCount());
		System.out.printf("Main: Fork/Join Pool: Running Thread Count:%d\n",
				pool.getRunningThreadCount());
		System.out.printf("Main: Fork/Join Pool: Queued Submission:%d\n",
				pool.getQueuedSubmissionCount());
		System.out.printf("Main: Fork/Join Pool: Queued Tasks:%d\n",
				pool.getQueuedTaskCount());
		System.out.printf("Main: Fork/Join Pool: Queued Submissions:%s\n",
				pool.hasQueuedSubmissions());
		System.out.printf("Main: Fork/Join Pool: Steal Count:%d\n",
				pool.getStealCount());
		System.out.printf("Main: Fork/Join Pool: Terminated :%s\n",
				pool.isTerminated());
		System.out.printf("**********************\n");
	}
}

它是如何工作的…

在這個指南,你實現了任務 使用 ForkJoinPool 類和一個擴充套件 RecursiveAction 類的 Task 類來增加array的元素;它是 ForkJoinPool 類執行的任務種類之一。當任務還在處理array時,你就把關於 ForkJoinPool 類的狀態資訊列印到操控臺。
你使用了ForkJoinPool類中的以下方法:

  • getPoolSize(): 此方法返回 int 值,它是ForkJoinPool內部執行緒池的worker執行緒們的數量。
  • getParallelism(): 此方法返回池的並行的級別。
  • getActiveThreadCount(): 此方法返回當前執行任務的執行緒的數量。
  • getRunningThreadCount():此方法返回沒有被任何同步機制阻塞的正在工作的執行緒。
  • getQueuedSubmissionCount(): 此方法返回已經提交給池還沒有開始他們的執行的任務數。
  • getQueuedTaskCount(): 此方法返回已經提交給池已經開始他們的執行的任務數。
  • hasQueuedSubmissions(): 此方法返回 Boolean 值,表明這個池是否有queued任務還沒有開始他們的執行。
  • getStealCount(): 此方法返回 long 值,worker 執行緒已經從另一個執行緒偷取到的時間數。
  • isTerminated(): 此方法返回 Boolean 值,表明 fork/join 池是否已經完成執行。

參見

第五章,Fork/Join 框架: Fork/Join 池的建立
第七章,自定義併發類:實現 ThreadFactory 介面生成Fork/Join 框架的自定義執行緒
第七章,自定義併發類:自定義在the Fork/Join框架的執行任務