1. 程式人生 > >java併發之CyclicBarrier(障礙器)

java併發之CyclicBarrier(障礙器)

CyclicBarrier是一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的執行緒的程式中,這些執行緒必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待執行緒後可以重用,所以稱它為迴圈 的 barrier。CyclicBarrier 支援一個可選的 Runnable 命令,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令只在每個屏障點執行一次。若在繼續所有參與執行緒之前更新共享狀態,此屏障操作 很有用。 

//設定parties、count及barrierCommand屬性。   
CyclicBarrier(int):   
  
//當await的數量到達了設定的數量後,首先執行該Runnable物件。   
CyclicBarrier(int,Runnable):   
  
//通知barrier已完成執行緒   
await(): 


適用情況:你希望建立一組任務,它們併發地執行工作,另外的一個任務在這一組任務併發執行結束前一直阻塞等待,直到該組任務全部執行結束,這個任務才得以執行。這非常像CountDownLatch,只是CountDownLatch是隻觸發一次的事件,而CyclicBarrier可以多次重用。
    下面給出一個簡單的例項來說明其用法:

import java.util.concurrent.BrokenBarrierException;   
import java.util.concurrent.CyclicBarrier;    
public class CyclicBarrierTest {   
        public static void main(String[] args) {   
                //建立CyclicBarrier物件,  
                //並設定執行完一組5個執行緒的併發任務後,再執行MainTask任務  
                CyclicBarrier cb = new CyclicBarrier(5, new MainTask());   
                new SubTask("A", cb).start();   
                new SubTask("B", cb).start();   
                new SubTask("C", cb).start();   
                new SubTask("D", cb).start();   
                new SubTask("E", cb).start();  
        }   
}   
  
/**  
* 最後執行的任務 
*/   
class MainTask implements Runnable {   
        public void run() {   
                System.out.println("......執行最後的任務了......");   
        }   
}   
  
/**  
* 一組併發任務  
*/   
class SubTask extends Thread {   
        private String name;   
        private CyclicBarrier cb;   
  
        SubTask(String name, CyclicBarrier cb) {   
                this.name = name;   
                this.cb = cb;   
        }   
  
        public void run() {   
                System.out.println("[併發任務" + name + "]  開始執行");   
                for (int i = 0; i < 999999; i++) ;    //模擬耗時的任務   
                System.out.println("[併發任務" + name + "]  開始執行完畢,通知障礙器");   
                try {   
                        //每執行完一項任務就通知障礙器   
                        cb.await();   
                } catch (InterruptedException e) {   
                        e.printStackTrace();   
                } catch (BrokenBarrierException e) {   
                        e.printStackTrace();   
                }   
        }   
} 
結果:

[併發任務A]  開始執行
[併發任務B]  開始執行
[併發任務B]  開始執行完畢,通知障礙器
[併發任務C]  開始執行
[併發任務C]  開始執行完畢,通知障礙器
[併發任務A]  開始執行完畢,通知障礙器
[併發任務D]  開始執行
[併發任務D]  開始執行完畢,通知障礙器
[併發任務E]  開始執行
[併發任務E]  開始執行完畢,通知障礙器
......終於要執行最後的任務了......

package com.gpl.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest2 {
	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final CyclicBarrier cb = new CyclicBarrier(3); // 三個執行緒同時到達
		for (int i = 0; i < 3; i++) {
			Runnable runnable = new Runnable() {
				public void run() {
					try {
						Thread.sleep((long) (Math.random() * 10000));
						System.out.println("執行緒"
								+ Thread.currentThread().getName()
								+ "即將到達集合地點1,當前已有"
								+ (cb.getNumberWaiting() + 1)
								+ "個已到達"
								+ (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊"
										: "正在等候"));
						try {
							cb.await();
						} catch (BrokenBarrierException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
						Thread.sleep((long) (Math.random() * 10000));
						System.out.println("執行緒"
								+ Thread.currentThread().getName()
								+ "即將到達集合地點2,當前已有"
								+ (cb.getNumberWaiting() + 1)
								+ "個已到達"
								+ (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊"
										: "正在等候"));
						try {
							cb.await();
						} catch (BrokenBarrierException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
						Thread.sleep((long) (Math.random() * 10000));
						System.out.println("執行緒"
								+ Thread.currentThread().getName()
								+ "即將到達集合地點3,當前已有"
								+ (cb.getNumberWaiting() + 1)
								+ "個已到達"
								+ (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊"
										: "正在等候"));
						try {
							cb.await();
						} catch (BrokenBarrierException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			};
			service.execute(runnable);
		}
		service.shutdown();
	}
}
結果:

執行緒pool-1-thread-3即將到達集合地點1,當前已有1個已到達正在等候
執行緒pool-1-thread-2即將到達集合地點1,當前已有2個已到達正在等候
執行緒pool-1-thread-1即將到達集合地點1,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-2即將到達集合地點2,當前已有1個已到達正在等候
執行緒pool-1-thread-3即將到達集合地點2,當前已有2個已到達正在等候
執行緒pool-1-thread-1即將到達集合地點2,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-1即將到達集合地點3,當前已有1個已到達正在等候
執行緒pool-1-thread-3即將到達集合地點3,當前已有2個已到達正在等候
執行緒pool-1-thread-2即將到達集合地點3,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-2即將到達集合地點1,當前已有2個已到達正在等候
執行緒pool-1-thread-1即將到達集合地點1,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-2即將到達集合地點2,當前已有1個已到達正在等候
執行緒pool-1-thread-3即將到達集合地點2,當前已有2個已到達正在等候
執行緒pool-1-thread-1即將到達集合地點2,當前已有3個已到達都到齊了,繼續走啊
執行緒pool-1-thread-1即將到達集合地點3,當前已有1個已到達正在等候
執行緒pool-1-thread-3即將到達集合地點3,當前已有2個已到達正在等候
執行緒pool-1-thread-2即將到達集合地點3,當前已有3個已到達都到齊了,繼續走啊

等價的一個程式:

package com.gpl.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest3 {
	public static void main(String[] args){
		CyclicBarrier cb = new CyclicBarrier(3); // 三個執行緒同時到達
		 new SubTask1("A", cb).start();
		 new SubTask1("B", cb).start(); 
	     new SubTask1("C", cb).start();
	}
	 
}

class SubTask1 extends Thread { 
    private String name; 
    private CyclicBarrier cb; 

    SubTask1(String name, CyclicBarrier cb) { 
            this.name = name; 
            this.cb = cb; 
    } 

    public void run() { 
            //System.out.println("[併發任務" + name + "]  開始執行"); 
            //for (int i = 0; i < 999999; i++) ;    //模擬耗時的任務 
            
            try {
				Thread.sleep((long) (Math.random() * 10000));
				System.out.println("執行緒"+ Thread.currentThread().getName()+ "即將到達集合地點1,當前已有"
						+ (cb.getNumberWaiting() + 1)+ "個已到達"+ (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊": "正在等候"));
				try { 
				        //每執行完一項任務就通知障礙器 
				        cb.await(); 
				} catch (InterruptedException e) { 
				        e.printStackTrace(); 
				} catch (BrokenBarrierException e) { 
				        e.printStackTrace(); 
				}Thread.sleep((long) (Math.random() * 10000));
				System.out.println("執行緒"
						+ Thread.currentThread().getName()
						+ "即將到達集合地點2,當前已有"
						+ (cb.getNumberWaiting() + 1)
						+ "個已到達"
						+ (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊"
								: "正在等候"));
				try {
					cb.await();
				} catch (BrokenBarrierException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				Thread.sleep((long) (Math.random() * 10000));
				System.out.println("執行緒"
						+ Thread.currentThread().getName()
						+ "即將到達集合地點3,當前已有"
						+ (cb.getNumberWaiting() + 1)
						+ "個已到達"
						+ (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走啊"
								: "正在等候"));
				try {
					cb.await();
				} catch (BrokenBarrierException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} 
    } 
}
結果:

執行緒Thread-0即將到達集合地點1,當前已有1個已到達正在等候
執行緒Thread-1即將到達集合地點1,當前已有2個已到達正在等候
執行緒Thread-2即將到達集合地點1,當前已有3個已到達都到齊了,繼續走啊
執行緒Thread-1即將到達集合地點2,當前已有1個已到達正在等候
執行緒Thread-0即將到達集合地點2,當前已有2個已到達正在等候
執行緒Thread-2即將到達集合地點2,當前已有3個已到達都到齊了,繼續走啊
執行緒Thread-0即將到達集合地點3,當前已有1個已到達正在等候
執行緒Thread-1即將到達集合地點3,當前已有2個已到達正在等候
執行緒Thread-2即將到達集合地點3,當前已有3個已到達都到齊了,繼續走啊