1. 程式人生 > >執行緒同步工具(四)在同一個點同步任務

執行緒同步工具(四)在同一個點同步任務

宣告:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 譯者:鄭玉婷

在同一個點同步任務

Java 併發 API 提供了可以允許2個或多個執行緒在在一個確定點的同步應用。它是 CyclicBarrier 類。此類與在此章節的等待多個併發事件完成指南中的 CountDownLatch 類相似,但是它有一些特殊性讓它成為更強大的類。

CyclicBarrier 類有一個整數初始值,此值表示將在同一點同步的執行緒數量。當其中一個執行緒到達確定點,它會呼叫await() 方法來等待其他執行緒。當執行緒呼叫這個方法,CyclicBarrier阻塞執行緒進入休眠直到其他執行緒到達。當最後一個執行緒呼叫CyclicBarrier 類的await() 方法,它喚醒所有等待的執行緒並繼續執行它們的任務。

CyclicBarrier 類有個有趣的優勢是,你可以傳遞一個外加的 Runnable 物件作為初始引數,並且當全部執行緒都到達同一個點時,CyclicBarrier類 會把這個物件當做執行緒來執行。此特點讓這個類在使用 divide 和 conquer 程式設計技術時,可以充分發揮任務的並行性,

在這個指南,你將學習如何使用 CyclicBarrier 類來讓一組執行緒在一個確定點同步。你也將使用 Runnable 物件,它將會在全部執行緒都到達確定點後被執行。在這個例子裡,你將在數字矩陣中查詢一個數字。矩陣會被分成多個子集(使用divide 和 conquer 技術),所以每個執行緒會在一個子集中查詢那個數字。一旦全部行程執行結束,會有一個最終任務來統一他們的結果。

準備

指南中的例子是使用Eclipse IDE 來實現的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 開啟並建立一個新的java專案。

怎麼做呢

按照這些步驟來實現下面的例子::

//1.  我們從實現2個輔助類開始。首先,建立一個類名為 MatrixMock。此類隨機生成一個在1-10之間的 數字矩陣,我們將從中查詢數字。
public class MatrixMock {

//2.   宣告私有 int matrix,名為 data。
private int data[][];

//3.   實現類的建構函式。此建構函式將接收矩陣的行數,行的長度,和我們將要查詢的數字作為引數。3個引數全部int 型別。
public MatrixMock(int size, int length, int number){

//4.   初始化建構函式將使用的變數和物件。
int counter=0;
data=new int[size][length];
Random random=new Random();

//5.   用隨機數字填充矩陣。每生成一個數字就與要查詢的數字對比,如果相等,就增加counter值。
for (int i=0; i<size; i++) {
	for (int j=0; j<length; j++){
		data[i][j]=random.nextInt(10);
		if (data[i][j]==number){
			counter++;
		}
	}
}

//6.   最後,在操控臺列印一條資訊,表示查詢的數字在生成的矩陣裡的出現次數。此資訊是用來檢查執行緒們獲得的正確結果的。
System.out.printf("Mock: There are %d ocurrences of number in generated data.\n",counter,number); //譯者注:把字串裡的number改為%d.

//7.	實現 getRow() 方法。此方法接收一個 int為引數,是矩陣的行數。返回行數如果存在,否則返回null。
public int[] getRow(int row){
	if ((row>=0)&&(row<data.length)){
		return data[row];
	}
	return null;
}

//8.   現在,實現一個類名為 Results。此類會在array內儲存被查詢的數字在矩陣的每行裡出現的次數。
public class Results {

//9.   宣告私有 int array 名為 data。
private int data[];

//10. 實現類的建構函式。此建構函式接收一個表明array元素量的整數作為引數。
public Results(int size){
	data=new int[size];
}

//11. 實現 setData() 方法。此方法接收array的某個位置和一個值作為引數,然後把array的那個位置設定為那個值。
public void	setData(int position, int value){
	data[position]=value;
}

//12. 實現 getData() 方法。此方法返回結果 array。
public int[] getData(){
return data;
}

//13. 現在你有了輔助類,是時候來實現執行緒了。首先,實現 Searcher 類。這個類會在隨機數字的矩陣中的特定的行裡查詢數字。建立一個類名為Searcher 並一定實現  Runnable 介面.
public class Searcher implements Runnable {

//14. 宣告2個私有int屬性名為 firstRow 和 lastRow。這2個屬性是用來確定將要用的子集的行。
private int firstRow;
private int lastRow;

//15. 宣告一個私有 MatrixMock 屬性,名為 mock。
private MatrixMock mock;

//16. 宣告一個私有 Results 屬性,名為 results。
private Results results;

//17.  宣告一個私有 int 屬性名為 number,用來儲存我們要查詢的數字。
private int number;

//18. 宣告一個 CyclicBarrier 物件,名為 barrier。
private final CyclicBarrier barrier;

//19. 實現類的建構函式,並初始化之前宣告的全部屬性。
public Searcher(int firstRow, int lastRow, NumberMock mock, Results results, int number, CyclicBarrier barrier){
	this.firstRow=firstRow;
	this.lastRow=lastRow;
	this.mock=mock;
	this.results=results;
	this.number=number;
	this.barrier=barrier;
}

//20. 實現 run() 方法,用來查詢數字。它使用內部變數,名為counter,用來儲存數字在每行出現的次數。
@Override
public void run() {
	int counter;

//21. 在操控臺列印一條資訊表明被分配到這個物件的行。
System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow);

//22. 處理分配給這個執行緒的全部行。對於每行,記錄正在查詢的數字出現的次數,並在相對於的 Results 物件中儲存此資料。
for (int i=firstRow; i<lastRow; i++){
	int row[]=mock.getRow(i);
	counter=0;
	for (int j=0; j<row.length; j++){
		if (row[j]==number){
		counter++;
	}
}

results.setData(i, counter);
}

//23. 列印資訊到操控臺表明此物件已經結束搜尋。
System.out.printf("%s: Lines processed.\n",Thread. currentThread().getName());

//24. 呼叫 CyclicBarrier 物件的 await() 方法 ,由於可能丟擲的異常,要加入處理 InterruptedException and BrokenBarrierException 異常的必需程式碼。
try {
	barrier.await();
} catch (InterruptedException e) {
	e.printStackTrace();
} catch (BrokenBarrierException e) {
	e.printStackTrace();
}

//25. 現在,實現一個類來計算數字在這個矩陣裡出現的總數。它使用儲存了矩陣中每行裡數字出現次數的 Results 物件來進行運算。建立一個類,名為 Grouper 並一定實現 Runnable 介面.
public class Grouper implements Runnable {

//26. 宣告一個私有 Results 屬性,名為 results。
private Results results;

//27.  實現類的建構函式,並初始化 Results 屬性。
public Grouper(Results results){
this.results=results;
}

//28.實現 run() 方法,用來計算結果array裡數字出現次數的總和。
@Override
public void run() {

//29. 宣告一個 int 變數並寫在操控臺寫一條資訊表明開始處理了。
int finalResult=0;
System.out.printf("Grouper: Processing results...\n");

//30. 使用 results 物件的 getData() 方法來獲得每行數字出現的次數。然後,處理array的全部元素,把每個元素的值加給 finalResult 變數。
int data[]=results.getData();
for (int number:data){
finalResult+=number;
}

//31. 在操控臺列印結果。
System.out.printf("Grouper: Total result: %d.\n",finalResult);

//32. 最後, 實現例子的 main 類,通過建立一個類,名為 Main 併為其新增 main() 方法。
public class Main {

public static void main(String[] args) {

//33. 宣告並初始5個常熟來儲存應用的引數。
final int ROWS=10000;
final int NUMBERS=1000;
final int SEARCH=5;
final int PARTICIPANTS=5;
final int LINES_PARTICIPANT=2000;

//34. Create a MatrixMock 物件,名為 mock. 它將有 10,000 行,每行1000個元素。現在,你要查詢的數字是5。
MatrixMock mock=new MatrixMock(ROWS, NUMBERS,SEARCH);

//35. 建立 Results 物件,名為 results。它將有 10,000 元素。
Results results=new Results(ROWS);

//36. 建立 Grouper 物件,名為 grouper。
Grouper grouper=new Grouper(results);

//37.  建立 CyclicBarrier 物件,名為 barrier。此物件會等待5個執行緒。當此執行緒結束後,它會執行前面建立的 Grouper 物件。
CyclicBarrier barrier=new CyclicBarrier(PARTICIPANTS,grouper);

//38. 建立5個 Searcher 物件,5個執行他們的執行緒,並開始這5個執行緒。
Searcher searchers[]=new Searcher[PARTICIPANTS];
for (int i=0; i<PARTICIPANTS; i++){
	searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_ PARTICIPANT)+LINES_PARTICIPANT, mock, results, 5,barrier);
	Thread thread=new Thread(searchers[i]);
	thread.start();
}
System.out.printf("Main: The main thread has finished.\n");

它是怎麼工作的…

以下裁圖是例子的執行結果:

例子中解決的問題比較簡單。我們有一個很大的隨機的整數矩陣,然後你想知道這矩陣裡面某個數字出現的次數。為了更好的執行,我們使用了 divide 和 conquer 技術。我們 divide 矩陣成5個子集,然後在每個子集裡使用一個執行緒來查詢數字。這些執行緒是 Searcher 類的物件。

我們使用 CyclicBarrier 物件來同步5個執行緒的完成,並執行 Grouper 任務處理個別結果,最後計算最終結果。

如我們之前提到的,CyclicBarrier 類有一個內部計數器控制到達同步點的執行緒數量。每次執行緒到達同步點,它呼叫 await() 方法告知 CyclicBarrier 物件到達同步點了。CyclicBarrier 把執行緒放入睡眠狀態直到全部的執行緒都到達他們的同步點。

當全部的執行緒都到達他們的同步點,CyclicBarrier 物件叫醒全部正在 await() 方法中等待的執行緒們,然後,選擇性的,為CyclicBarrier的建構函式 傳遞的 Runnable 物件(例子裡,是 Grouper 物件)建立新的執行緒執行外加任務。

更多…

CyclicBarrier 類有另一個版本的 await() 方法:

  • await(long time, TimeUnit unit): 執行緒會一直休眠直到被中斷;內部計數器到達0,或者特定的時間過去了。TimeUnit類有多種常量: DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, and SECONDS.

此類也提供了 getNumberWaiting() 方法,返回被 await() 方法阻塞的執行緒數,還有 getParties() 方法,返回將與CyclicBarrier同步的任務數。

重置 CyclicBarrier 物件
CyclicBarrier 類與CountDownLatch有一些共同點,但是也有一些不同。最主要的不同是,CyclicBarrier物件可以重置到它的初始狀態,重新分配新的值給內部計數器,即使它已經被初始過了。

可以使用 CyclicBarrier的reset() 方法來進行重置操作。當這個方法被呼叫後,全部的正在await() 方法裡等待的執行緒接收到一個 BrokenBarrierException 異常。此異常在例子中已經用列印stack trace處理了,但是在一個更復制的應用,它可以執行一些其他操作,例如重新開始執行或者在中斷點恢復操作。

破壞 CyclicBarrier 物件
CyclicBarrier 物件可能處於一個特殊的狀態,稱為 broken。當多個執行緒正在 await() 方法中等待時,其中一個被中斷了,此執行緒會收到 InterruptedException 異常,但是其他正在等待的執行緒將收到 BrokenBarrierException 異常,並且 CyclicBarrier 會被置於broken 狀態中。

CyclicBarrier 類提供了isBroken() 方法,如果物件在 broken 狀態,返回true,否則返回false。

參見