1. 程式人生 > >關於對CountDownLatch、CyclicBarrier、Semaphore執行緒同步理解

關於對CountDownLatch、CyclicBarrier、Semaphore執行緒同步理解

概念描述以及程式碼理解

  1. CountDownLatch(閉鎖)

出現再JDK1.5中,主要是使一個執行緒A或是組執行緒A等待其它執行緒執行完畢後,一個執行緒A或是組執行緒A才繼續執行,可以實現執行緒組同步執行,並在所有執行緒組結束後再執行等待的執行緒,閉鎖的狀態是一次性。 例如:主執行緒等待執行緒組執行完畢後再執行,是執行緒組之間的等待。

public class CountDownLatchTest {

    // 模擬了100米賽跑,10名選手已經準備就緒,只等裁判一聲令下。當所有人都到達終點時,比賽結束。
    public static void main(String[] args) throws InterruptedException {

        // 開始的倒數鎖 
        final CountDownLatch begin = new CountDownLatch(1);  

        // 結束的倒數鎖 
        final CountDownLatch end = new CountDownLatch(10);  

        // 十名選手 
        final ExecutorService exec = Executors.newFixedThreadPool(10);  

        for (int index = 0; index < 10; index++) {
            final int NO = index + 1;  
            Runnable run = new Runnable() {
                public void run() {  
                    try {  
                        // 如果當前計數為零,則此方法立即返回。
                        // 等待
                        begin.await();  
                        Thread.sleep((long) (Math.random() * 10000));  
                        System.out.println("No." + NO + " arrived");  
                    } catch (InterruptedException e) {  
                    } finally {  
                        // 每個選手到達終點時,end就減一
                        end.countDown();
                    }  
                }  
            };  
            exec.submit(run);
        }  
        System.out.println("Game Start");  
        // begin減一,開始遊戲
        begin.countDown();  
        // 等待end變為0,即所有選手到達終點
        end.await();  
        System.out.println("Game Over");  
        exec.shutdown();  
    }
}

wait還有一個超時的方法 public boolean await(long timeout, TimeUnit unit) throws InterruptedException 使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷或超出了指定的等待時間。如果當前計數為零,則此方法立刻返回 1. 在進入此方法時已經設定了該執行緒的中斷狀態;或者 在等待時被中斷,則丟擲 InterruptedException,並且清除當前執行緒的已中斷狀態。 2. 如果超出了指定的等待時間,則返回值為 false

Game Start
No.9 arrived
No.6 arrived
No.8 arrived
No.7 arrived
No.10 arrived
No.1 arrived
No.5 arrived
No.4 arrived
No.2 arrived
No.3 arrived
Game Over

執行過程理解:一共有11個執行緒,主執行緒建立並啟動10個子執行緒,然後子執行緒組再執行的時候被 begin.await()都阻塞了,主執行緒繼續執行到 begin.countDown()時,begin 計數器變成0, end.await()阻塞主執行緒,10個子執行緒繼續執行,因為begin 計數器成0,就不會被阻塞了,等10個子執行緒執行完畢(end計數器減一),end為0時,end.await()執行後面的程式碼,執行流程結束。

2.CyclicBarrier(迴圈屏障)

主要是一組執行緒使用await()方法之後,執行緒就處於barrier狀態了,當所有執行緒都到達各自的barrier後,再同時執行各自barrier下面的程式碼,是執行緒之間的互相等待。例如:團隊旅遊,一個團隊通常分為幾組,每組人走的路線可能不同,但都需要到達某一地點等待團隊其它成員到達後才能進行下一站,是執行緒組內間的等待。

  • CyclicBarrier提供的方法有:
——CyclicBarrier(parties)

初始化相互等待的執行緒數量的構造方法。

——CyclicBarrier(parties,Runnable barrierAction)

初始化相互等待的執行緒數量以及屏障執行緒的構造方法。

屏障執行緒的執行時機:等待的執行緒數量=parties之後,CyclicBarrier開啟屏障之前。

舉例:在分組計算中,每個執行緒負責一部分計算,最終這些執行緒計算結束之後,交由屏障執行緒進行彙總計算。

——getParties()

獲取CyclicBarrier開啟屏障的執行緒數量,也成為方數。

——getNumberWaiting()

獲取正在CyclicBarrier上等待的執行緒數量。

——await()

在CyclicBarrier上進行阻塞等待,直到發生以下情形之一:


在CyclicBarrier上等待的執行緒數量達到parties,則所有執行緒被釋放,繼續執行。
當前執行緒被中斷,則丟擲InterruptedException異常,並停止等待,繼續執行。
其他等待的執行緒被中斷,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
其他等待的執行緒超時,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
其他執行緒呼叫CyclicBarrier.reset()方法,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
 //建構函式1:初始化-開啟屏障的方數
CyclicBarrier barrier0 = new CyclicBarrier(2);
//通過barrier.getParties()獲取開啟屏障的方數
LOGGER.info("barrier.getParties()獲取開啟屏障的方數:" + barrier0.getParties());
System.out.println();
//通過barrier.getNumberWaiting()獲取正在等待的執行緒數
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:初始----" + barrier0.getNumberWaiting());
System.out.println();
new Thread(() -> {
    //新增一個等待執行緒
    LOGGER.info("新增第1個等待執行緒----" + Thread.currentThread().getName());
    try {
        barrier0.await();
        LOGGER.info(Thread.currentThread().getName() + " is running...");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
    LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(10);
//通過barrier.getNumberWaiting()獲取正在等待的執行緒數
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:新增第1個等待執行緒---" + barrier0.getNumberWaiting());
Thread.sleep(10);
System.out.println();
new Thread(() -> {
    //新增一個等待執行緒
    LOGGER.info("新增第2個等待執行緒----" + Thread.currentThread().getName());
    try {
        barrier0.await();
        LOGGER.info(Thread.currentThread().getName() + " is running...");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
    LOGGER.info(Thread.currentThread().getName() + " is terminated.");
}).start();
Thread.sleep(100);
System.out.println();
//通過barrier.getNumberWaiting()獲取正在等待的執行緒數
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---" + barrier0.getNumberWaiting());

//已經開啟的屏障,再次有執行緒等待的話,還會重新生效--視為迴圈
new Thread(() -> {
    LOGGER.info("屏障開啟之後,再有執行緒加入等待:" + Thread.currentThread().getName());
    try {
        //BrokenBarrierException
        barrier0.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
    LOGGER.info(Thread.currentThread().getName() + " is terminated.");

}).start();
System.out.println();
Thread.sleep(10);
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---" + barrier0.getNumberWaiting());
Thread.sleep(10);
new Thread(() -> {
    LOGGER.info("屏障開啟之後,再有執行緒加入等待:" + Thread.currentThread().getName());
    try {
        //BrokenBarrierException
        barrier0.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
    LOGGER.info(Thread.currentThread().getName() + " is terminated.");

}).start();
Thread.sleep(10);
LOGGER.info("通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---" + barrier0.getNumber
2018-04-01 13:27:55 INFO - barrier.getParties()獲取開啟屏障的方數:2

2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:初始----0

2018-04-01 13:27:55 INFO - 新增第1個等待執行緒----Thread-0
2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:新增第1個等待執行緒---1

2018-04-01 13:27:55 INFO - 新增第2個等待執行緒----Thread-1
2018-04-01 13:27:55 INFO - Thread-1 is running...
2018-04-01 13:27:55 INFO - Thread-0 is running...
2018-04-01 13:27:55 INFO - Thread-1 is terminated.
2018-04-01 13:27:55 INFO - Thread-0 is terminated.

2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---0

2018-04-01 13:27:55 INFO - 屏障開啟之後,再有執行緒加入等待:Thread-2
2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---1
2018-04-01 13:27:55 INFO - 屏障開啟之後,再有執行緒加入等待:Thread-3
2018-04-01 13:27:55 INFO - Thread-3 is terminated.
2018-04-01 13:27:55 INFO - Thread-2 is terminated.
2018-04-01 13:27:55 INFO - 通過barrier.getNumberWaiting()獲取正在等待的執行緒數:開啟屏障之後---0
  • 熟悉reset()的用法

如果是一個初始的CyclicBarrier,則reset()之後,什麼也不會發生,如果在等待過程中,執行reset()方法,等待的執行緒跑出BrokenBarrierException異常,並不再等待。

在這裡插入程式碼片
  1. Semaphore (訊號量)

是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源,這個理解應該不難。

package javalearning;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
	private Semaphore smp = new Semaphore(3); 
	private Random rnd = new Random();
	
	class TaskDemo implements Runnable{
		private String id;
		TaskDemo(String id){
			this.id = id;
		}
		@Override
		public void run(){
			try {
				smp.acquire();
				System.out.println("Thread " + id + " is working");
				Thread.sleep(rnd.nextInt(1000));
				smp.release();
				System.out.println("Thread " + id + " is over");
			} catch (InterruptedException e) {
			}
		}
	}
	
	public static void main(String[] args){
		SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
		//注意我建立的執行緒池型別,
		ExecutorService se = Executors.newCachedThreadPool();
		se.submit(semaphoreDemo.new TaskDemo("a"));
		se.submit(semaphoreDemo.new TaskDemo("b"));
		se.submit(semaphoreDemo.new TaskDemo("c"));
		se.submit(semaphoreDemo.new TaskDemo("d"));
		se.submit(semaphoreDemo.new TaskDemo("e"));
		se.submit(semaphoreDemo.new TaskDemo("f"));
		se.shutdown();
	}
}
執行結果

Thread c is working

Thread b is working

Thread a is working

Thread c is over

Thread d is working

Thread b is over

Thread e is working

Thread a is over

Thread f is working

Thread d is over

Thread e is over

Thread f is over

可以看出,最多同時有三個執行緒併發執行,也可以認為有三個公共資源