1. 程式人生 > >二十八、併發程式設計之併發工具類CyclicBarrier詳解

二十八、併發程式設計之併發工具類CyclicBarrier詳解

一、概述

CyclicBarrier是一個同步工具類,它允許一組執行緒互相等待,直到到達某個公共屏障點。與CountDownLatch不同的是該barrier在釋放等待執行緒後可以重用,所以稱它為迴圈(Cyclic)的屏障(Barrier)。
CyclicBarrier支援一個可選的Runnable命令,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令只在每個屏障點執行一次。若在繼續所有參與執行緒之前更新共享狀態,此屏障操作很有用。

二、提供的方法:

//parties表示屏障攔截的執行緒數量,當屏障撤銷時,先執行barrierAction,然後在釋放所有執行緒
public CyclicBarrier(int parties, Runnable barrierAction) //barrierAction預設為null public CyclicBarrier(int parties) /* * 當前執行緒等待直到所有執行緒都呼叫了該屏障的await()方法 * 如果當前執行緒不是將到達的最後一個執行緒,將會被阻塞。解除阻塞的情況有以下幾種 * 1)最後一個執行緒呼叫await() * 2)當前執行緒被中斷 * 3)其他正在該CyclicBarrier上等待的執行緒被中斷 * 4)其他正在該CyclicBarrier上等待的執行緒超時 * 5)其他某個執行緒呼叫該CyclicBarrier的reset()方法 * * 如果當前執行緒在進入此方法時已經設定了該執行緒的中斷狀態或者在等待時被中斷,將丟擲InterruptedException,並且清除當前執行緒的已中斷狀態。 * 如果線上程處於等待狀態時barrier被reset()或者在呼叫await()時 barrier 被損壞,將丟擲 BrokenBarrierException 異常。 * 如果任何執行緒在等待時被中斷,則其他所有等待執行緒都將丟擲 BrokenBarrierException 異常,並將 barrier 置於損壞狀態。 * 如果當前執行緒是最後一個將要到達的執行緒,並且構造方法中提供了一個非空的屏障操作(barrierAction),那麼在允許其他執行緒繼續執行之前,當前執行緒將執行該操作。 * 如果在執行屏障操作過程中發生異常,則該異常將傳播到當前執行緒中,並將 barrier 置於損壞狀態。 * * 返回值為當前執行緒的索引,0表示當前執行緒是最後一個到達的執行緒 */
public int await() throws InterruptedException, BrokenBarrierException //在await()的基礎上增加超時機制,如果超出指定的等待時間,則丟擲 TimeoutException 異常。如果該時間小於等於零,則此方法根本不會等待。 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //將屏障重置為其初始狀態。如果所有參與者目前都在屏障處等待,則它們將返回,同時丟擲一個BrokenBarrierException。
public void reset()

對於失敗的同步嘗試,CyclicBarrier 使用了一種要麼全部要麼全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致執行緒過早地離開了屏障點,那麼在該屏障點等待的其他所有執行緒也將通過 BrokenBarrierException(如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。

三、實現原理

基於ReentrantLock和Condition機制實現。除了getParties()方法,CyclicBarrier的其他方法都需要獲取鎖。

	private final ReentrantLock lock = new ReentrantLock();    //可重入鎖
	private final Condition trip = lock.newCondition();
	private final int parties;    //攔截的執行緒數量
	private final Runnable barrierCommand;    //當屏障撤銷時,需要執行的屏障操作
	//當前的Generation。每當屏障失效或者開閘之後都會自動替換掉。從而實現重置的功能。
	private Generation generation = new Generation();

	//還能阻塞的執行緒數(即parties-當前阻塞的執行緒數),當新建generation或generation被破壞時,count會被重置。因為對Count的操作都是在獲取鎖之後,所以不需要其他同步措施。
	private int count;    

	//靜態內聯類
	private static class Generation {
	    boolean broken = false;    //當前的屏障是否破壞
	}
  • await()
	public int await() throws InterruptedException, BrokenBarrierException {
	    try {
	        return dowait(false, 0L);
	    } catch (TimeoutException toe) {
	        throw new Error(toe); // cannot happen;
	    }
	}

	private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {
	    final ReentrantLock lock = this.lock;
	    lock.lock();    //獲取鎖
	    try {
	        //儲存此時的generation
	        final Generation g = generation;
	        //判斷屏障是否被破壞
	        if (g.broken)
	            throw new BrokenBarrierException();
	        //判斷執行緒是否被中斷,如果被中斷,呼叫breakBarrier()進行屏障破壞處理,並丟擲InterruptedException
	        if (Thread.interrupted()) {
	            breakBarrier();    
	            throw new InterruptedException();
	        }

	        int index = --count;    //剩餘count遞減,並賦值給執行緒索引,作為方法的返回值
	        //如果執行緒索引將為0,說明當前執行緒是最後一個到達的執行緒。執行可能存在的屏障操作 barrierCommand,設定下一個Generation。相當於每次開閘之後都進行了一次reset。
	        if (index == 0) {  // tripped    
	            boolean ranAction = false;
	            try {
	                final Runnable command = barrierCommand;
	                if (command != null)
	                    command.run();    //同步執行barrierCommand
	                ranAction = true;
	                nextGeneration();    //執行成功設定下一個nextGeneration
	                return 0;
	            } finally {
	                if (!ranAction)    //如果barrierCommand執行失敗,進行屏障破壞處理
	                    breakBarrier();
	            }
	        }
	        
	        //如果當前執行緒不是最後一個到達的執行緒
	        for (;;) {
	            try {
	                if (!timed)
	                    trip.await();    //呼叫Condition的await()方法阻塞
	                else if (nanos > 0L)
	                    nanos = trip.awaitNanos(nanos);    //呼叫Condition的awaitNanos()方法阻塞
	            } catch (InterruptedException ie) {
	                //如果當前執行緒被中斷,則判斷是否有其他執行緒已經使屏障破壞。若沒有則進行屏障破壞處理,並丟擲異常;否則再次中斷當前執行緒
	                if (g == generation && ! g.broken) {    
	                    breakBarrier();
	                    throw ie;
	                } else {
	                    Thread.currentThread().interrupt();
	                    //這種捕獲了InterruptException之後呼叫Thread.currentThread().interrupt()是一種通用的方式。其實就是為了儲存中斷狀態,從而讓其他更高層次的程式碼注意到這個中斷。
	                }
	            }
	            //如果屏障被破壞,當前執行緒拋BrokenBarrierException
	            if (g.broken)
	                throw new BrokenBarrierException();
	            
	            //如果已經換代,直接返回index(last thread已經執行的nextGeneration,但當前執行緒還沒有執行到該語句)
	            if (g != generation)
	                return index;
	            
	            //超時,進行屏障破壞處理,並拋TimeoutException
	            if (timed && nanos <= 0L) {
	                breakBarrier();
	                throw new TimeoutException();
	            }
	        }
	    } finally {
	        lock.unlock();    //釋放鎖
	    }
	}

	//將當前屏障置為破壞狀態、重置count、並喚醒所有被阻塞的執行緒。
	//必須先獲取鎖,才能呼叫此方法
	private void breakBarrier() {
	    generation.broken = true;
	    count = parties;
	    trip.signalAll();
	}

	//喚醒trip上等待的所有執行緒,設定下一個Generation
	private void nextGeneration() {
	    trip.signalAll();
	    count = parties;
	    generation = new Generation();
	}
  • reset()
//重置屏障,先進行屏障破壞處理,再設定下一代generation
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

四、例子

public class Demo {
	Random random = new Random();
	public void meeting(CyclicBarrier barrier) {
		try {
			Thread.sleep(random.nextInt(40000));//讓執行緒隨機休息一段時間
		} catch (InterruptedException e1) {
			e1.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()+"到達了會議室,等待開會");
//		if(Thread.currentThread().getName().equals("Thread-1")) {
//			throw new RuntimeException();
//		}
		try {
			barrier.await();//等待
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()+"發言");
	}
	public static void main(String[] args) {
		Demo d = new Demo();
		CyclicBarrier barrier = new CyclicBarrier(10,new Runnable() {
			@Override
			public void run() {
				System.out.println("好!,我們開始開會。。。");
			}
		});
		for(int i=0;i<10;i++) {
			new Thread(new Runnable(){
				@Override
				public void run() {
					d.meeting(barrier);
				}
			}).start();
		}
	}
}

五、CyclicBarrier與CountDownLatch比較

  • CountDownLatch是執行緒組之間的等待,即一個(或多個)執行緒等待N個執行緒完成某件事情之後再執行;而CyclicBarrier則是執行緒組內的等待,即每個執行緒相互等待,即N個執行緒都被攔截之後,然後依次執行。
  • CountDownLatch是減計數方式,而CyclicBarrier是加計數方式。
  • CountDownLatch計數為0無法重置,而CyclicBarrier計數達到初始值,則可以重置。
  • CountDownLatch不可以複用,而CyclicBarrier可以複用。
  • CountDownLatch基於AQS;CyclicBarrier基於鎖和Condition。本質上都是依賴於volatile和CAS實現的。

原文