1. 程式人生 > >zookeeper開源客戶端Curator典型應用場景之-Barrier屏障(十三)

zookeeper開源客戶端Curator典型應用場景之-Barrier屏障(十三)

什麼是Barrier

Barrier是這樣的:Barrier是一個同步點,每一個程序到達此點都要等待,直到某一個條件滿足,然後所有的節點繼續進行。
比如:賽跑大家都知道,所有比賽人員都會在起跑線外等待,直到教練員的槍響之後,所有參賽者立刻開始賽跑。
JDK的併發包下有CyclicBarrier,它看起來和CountDownLatch有很大的相似之處:

  • CountDownLatch:是一個同步的輔助類,允許一個或多個執行緒,等待其他一組執行緒完成操作,再繼續執行。CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。
  • CyclicBarrier:是一個同步的輔助類,允許一組執行緒相互之間等待,達到一個共同點,再繼續執行。

就拿上面的賽跑舉例子,比如 我們需要 10名參賽者,每當有一名人報名,需要的人數就減一,直到報滿10個人為止。這個時候就用CountDownLatch,假如說下午2點開始比賽,有的參賽者來的早,那麼它需要等待其他參賽者到來之後才開始進行比賽,這個時候就用 CyclicBarrier。
下面是使用jdk的CyclicBarrier模擬賽跑:

public class TestCyclicBarrier {

    /** 參賽人數 */
    public static Integer RUNNER_COUNT =
3; public static CyclicBarrier barrier = new CyclicBarrier(RUNNER_COUNT); public static void main(String[] args) throws IOException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(3); for(int i=1; i<=RUNNER_COUNT; i++){ final
int index = i; executor.submit(new Thread(new Runnable() { @Override public void run() { System.out.println("參賽者" + index+ "準備好了."); try { TestCyclicBarrier.barrier.await(); } catch (Exception e) {} System.out.println("參賽者" + index+" 開跑!"); } })); } executor.shutdown(); } }

上面是使用JDK自帶的CyclicBarrier實現的賽跑例子,可以看到多執行緒在併發情況下,都會準確的等待所有執行緒都處於就緒狀態後才開始同時執行其他業務邏輯。如果是在同一個JVM中的話,使用CyclicBarrier完全可以解決諸如此類的多執行緒同步問題。但是,如果在分散式環境中又該如何解決呢?Curator中提供DistributedBarrier就是用來實現分散式Barrier的。

DistributedBarrier

DistributedBarrier類實現了屏障的功能。 它的建構函式如下:

public DistributedBarrier(CuratorFramework client, String barrierPath);

首先你需要設定屏障,它將阻塞執行到此的當前執行緒:

// 設定屏障,每個執行緒設定一次
barrier.setBarrier();

然後需要阻塞的執行緒呼叫,‘方法等待放行條件’,如果連線丟失,此方法將丟擲異常:

barrier.waitOnBarrier();

當條件滿足時,移除屏障,所有等待的執行緒將繼續執行:

removeBarrier();
例子:
public class DistributedBarrierExample {

    private static final String PATH = "/examples/barrier";

    /** 客戶端數量 */
    private static final int CLIENT_COUNT = 5;

    private static DistributedBarrier  barrier;

    public static void main(String[] args) throws Exception {

        for(int i=0;i<CLIENT_COUNT;i++){
            final int index = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        CuratorFramework client = CuratorFrameworkFactory.newClient("172.20.10.9:2181",3000,3000, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
                        client.start();

                        //獲取DistributedBarrier
                        barrier = new DistributedBarrier(client, PATH);
                        System.out.println("執行緒" +index+" 等待");
                        barrier.setBarrier();
                        barrier.waitOnBarrier();
                        System.out.println("執行緒" +index+" 已執行");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        Thread.sleep(20*1000);

        if(barrier != null){
            System.out.println("所有執行緒都已到達,準備啟動");
            barrier.removeBarrier();
        }
    }

}

我們建立了5個執行緒,在此Barrier上等待。最後移除柵欄後所有的執行緒才繼續執行。但是我們並不知道要什麼時候移除屏障,Curator還提供了另一種執行緒自發觸發Barrier釋放的模式。

DistributedDoubleBarrier

雙重屏障,在協作開始之前同步,當足夠數量的程序加入到屏障後,開始協作,當所有程序完畢後離開屏障。
雙柵欄類是DistributedDoubleBarrier。
建構函式為:

public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty);

memberQty是成員數量,當enter()方法被呼叫時,成員被阻塞,直到所有的成員都呼叫了enter()。 當leave()方法被呼叫時,它也阻塞呼叫執行緒, 直到所有的成員都呼叫了leave()。
就像賽跑比賽, 發令槍響, 所有的參賽者開始跑,等所有的參賽者跑過終點線,比賽才結束。
DistributedDoubleBarrier 會監控連線狀態,當連線斷掉時enter()和leave方法會丟擲異常。

例子:
public class DistributedDoubleBarrierExample {

    private static final String PATH = "/examples/barrier";

    /** 客戶端數量 */
    private static final int CLIENT_COUNT = 5;

    public static void main(String[] args) throws Exception {

        for(int i=0;i<CLIENT_COUNT;i++){
            final int index = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        CuratorFramework client = CuratorFrameworkFactory.newClient("172.20.10.9:2181",3000,3000, new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE));
                        client.start();

                        //獲取DistributedDoubleBarrier
                        DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH,CLIENT_COUNT);
                        System.out.println("執行緒" +index+" 等待");
                        barrier.enter();
                        //呼叫enter阻塞,直到所有執行緒都到達之後執行,執行完畢之後,呼叫leave阻塞,直到所有執行緒都呼叫leave
                        System.out.println("執行緒" +index+" 已執行");
                        barrier.leave();

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        Thread.sleep(Integer.MAX_VALUE);
    }

}

上面這個示例程式和JDK自帶的CyclicBarrier非常類似,他們都指定了進入Barrier的成員數閾值memberQty,每個Barrier的參與者都會在呼叫DistributedDoubleBarrier.enter()方法之後進行等待,此時處於準備進入狀態。一旦準備進入Barrier的成員數達到指定數量之後,所有的成員會被同時觸發進入。之後呼叫DistributedDoubleBarrier.leave()方法則會再次等待,此時處於退出狀態。一旦準備退出Barrier的成員數達到5個後,所有的成員同樣會被同時觸發退出。因此,使用Curator的DistributedDoubleBarrier能夠很好的實現一個分散式Barrier,並控制其同時進入和退出。