1. 程式人生 > >Java 多執行緒 執行緒互相等待 CyclicBarrier

Java 多執行緒 執行緒互相等待 CyclicBarrier

先介紹一下JDK內容:

java.util.concurrent 
類 CyclicBarrier
java.lang.Object
  繼承者 java.util.concurrent.CyclicBarrier

--------------------------------------------------------------------------------

public class CyclicBarrierextends Object一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的執行緒的程式中,這些執行緒必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待執行緒後可以重用,所以稱它為迴圈 的 barrier。 

CyclicBarrier 支援一個可選的 Runnable 命令,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令只在每個屏障點執行一次。若在繼續所有參與執行緒之前更新共享狀態,此屏障操作 很有用。 

示例用法:下面是一個在並行分解設計中使用 barrier 的例子: 

 class Solver {
   final int N;
   final float[][] data;
   final CyclicBarrier barrier;
   
   class Worker implements Runnable {
     int myRow;
     Worker(int row) { myRow = row; }
     public void run() {
       while (!done()) {
         processRow(myRow);

         try {
           barrier.await(); 
         } catch (InterruptedException ex) { 
return; 
         } catch (BrokenBarrierException ex) { 
return; 
         }
       }
     }
   }

   public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length;
     barrier = new CyclicBarrier(N, 
                                 new Runnable() {
                                   public void run() { 
                                     mergeRows(...); 
                                   }
                                 });
     for (int i = 0; i < N; ++i) 
       new Thread(new Worker(i)).start();

     waitUntilDone();
   }
 }
 在這個例子中,每個 worker 執行緒處理矩陣的一行,在處理完所有的行之前,該執行緒將一直在屏障處等待。處理完所有的行之後,將執行所提供的 Runnable 屏障操作,併合並這些行。如果合併者確定已經找到了一個解決方案,那麼 done() 將返回 true,所有的 worker 執行緒都將終止。 
如果屏障操作在執行時不依賴於正掛起的執行緒,則執行緒組中的任何執行緒在獲得釋放時都能執行該操作。為方便此操作,每次呼叫 await() 都將返回能到達屏障處的執行緒的索引。然後,您可以選擇哪個執行緒應該執行屏障操作,例如: 

  if (barrier.await() == 0) {
     // log the completion of this iteration
   }對於失敗的同步嘗試,CyclicBarrier 使用了一種要麼全部要麼全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致執行緒過早地離開了屏障點,那麼在該屏障點等待的其他所有執行緒也將通過 BrokenBarrierException(如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。 

記憶體一致性效果:執行緒中呼叫 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,後者依次 happen-before 緊跟在從另一個執行緒中對應 await() 成功返回的操作。 



從以下版本開始: 
1.5 
另請參見:
CountDownLatch
然後是Java程式設計思想中的介紹:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * CyclicBarrier
 * 
 * CyclicBarrier適用於這樣的情況:你希望建立一組任務,它們並行地執行工作,然後在進行下一個步驟之前等待,
 * 直到所有任務都完成(看起來有些像join())。它使得所有的並行任務都將在柵欄處列隊,因此可以一直地向前移動。
 * 這非常像CountDownLatch,只是CountDownLatch是隻觸發一次的事件,而CyclicBarrier可以多次重用。
 * 
 * 對於失敗的同步嘗試,CyclicBarrier 使用了一種要麼全部要麼全不 (all-or-none) 的破壞模式:
 * 如果因為中斷、失敗或者超時等原因,導致執行緒過早地離開了屏障點,那麼在該屏障點等待的其他所有執行緒也將通過 BrokenBarrierException
 * (如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。 
 * 記憶體一致性效果:執行緒中呼叫 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,
 * 後者依次 happen-before 緊跟在從另一個執行緒中對應 await() 成功返回的操作。 
 * 
 * 下面是Hosrac的賽馬遊戲的面向物件的多執行緒版本,其中使用了CyclicBarrier
 * 
 */
/**
 * 馬的物件,每匹馬有自己的編號,每次前進將產生一個隨機數的步數,前進一次後進行等待。
 * 
 * @create @author Henry @date 2017-1-3
 * 
 */
class Horse implements Runnable {
	private static int counter = 0;
	private final int id = counter++;
	private int strides = 0;
	private static Random rand = new Random(47);
	private static CyclicBarrier barrier;

	public Horse(CyclicBarrier barrier) {
		this.barrier = barrier;
	}

	public synchronized int getStrides() {
		return strides;
	}

	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				synchronized (this) {
					strides += rand.nextInt(3);
					if(getStrides()==14)
						throw new Exception("world");
				}
				barrier.await();
			}
		} catch (InterruptedException e) {
			System.err.println("InterruptedException");
		} catch (BrokenBarrierException e) {
			System.err.println("BrokenBarrierException");
			throw new RuntimeException(e);
		} catch (Exception e) {
			System.err.println("Exception");
			throw new RuntimeException(e);
		}
	}

	@Override
	public String toString() {
		return "Horse " + id + " ";
	}

	/**
	 * 輸出馬跑的步數。
	 * 
	 * @return
	 */
	public String tracks() {
		StringBuilder s = new StringBuilder();
		for (int i = 0; i < getStrides(); i++)
			s.append("*");
		s.append(id);
		return s.toString();
	}
}

/**
 * 馬場跑道,定義長度為75。
 * 
 * @create @author Henry @date 2017-1-3
 * 
 */
public class HorseRace {
	static final int FINISH_LINE = 75;
	private List<Horse> horses = new ArrayList<Horse>();
	private ExecutorService exec = Executors.newCachedThreadPool();
	private CyclicBarrier barrier;

	public HorseRace(int nHorses, final int pause) {
		barrier = new CyclicBarrier(nHorses, new Runnable() {

			@Override
			public void run() {
				StringBuilder s = new StringBuilder();
				for (int i = 0; i < FINISH_LINE; i++)
					s.append("=");
				System.out.println(s.toString());
				for (Horse horse : horses)
					System.out.println(horse.tracks());
				for (Horse horse : horses)
					if (horse.getStrides() >= FINISH_LINE) {
						System.out.println(horse + " won!");
						exec.shutdownNow();
						return;
					}
				try {
					TimeUnit.MILLISECONDS.sleep(pause);
				} catch (InterruptedException e) {
					System.err.println("barrier-action sleep interrupted");
				}
			}
		});
		for (int i = 0; i < nHorses; i++) {
			Horse horse = new Horse(barrier);
			horses.add(horse);
			exec.execute(horse);
		}
	}

	public static void main(String[] args) {
		int nHorses = 8;
		int pause = 200;
		new HorseRace(nHorses, pause);
	}
}

    可以向CyclicBarrier提供一個“柵欄動作”,它是一個Runnable,當計數值到達0時自動執行---這是CyclicBarrier和CountDownLatch之間的另一個區別。這裡,柵欄動作是作為匿名內部類建立的,它被提交給了CyclicBarrier的構造器。

    我試圖讓每匹馬都列印自己,但是之後的顯式順序取決於工作管理員。CyclicBarrier使得每匹馬都要執行為了向前移動所必需執行的所有工作,然後必須在柵欄處等待其他所有的馬都準備完畢,當所有的馬都向前移動的時候,CyclicBarrier將自動呼叫Runnale柵欄動作任務,按順序顯式馬和終點線的位置。

    一旦所有的任務都越過了柵欄,它就會自動地為下一回合比賽做好準備。

    為了展示這個非常簡單的動畫效果,你需要將控制檯視窗的尺寸調整為小到只有馬時,才會展示出來。