1. 程式人生 > >跟著例項學習ZooKeeper的用法: Barrier

跟著例項學習ZooKeeper的用法: Barrier

分散式Barrier是這樣一個類: 它會阻塞所有節點上的等待程序,知道某一個被滿足, 然後所有的節點繼續進行。

比如賽馬比賽中, 等賽馬陸續來到起跑線前。 一聲令下,所有的賽馬都飛奔而出。

柵欄Barrier

DistributedBarrier類實現了柵欄的功能。 它的建構函式如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你需要設定柵欄,它將阻塞在它上面等待的執行緒:

setBarrier();

然後需要阻塞的執行緒呼叫“方法等待放行條件:

public void waitOnBarrier()

當條件滿足時,移除柵欄,所有等待的執行緒將繼續執行:

removeBarrier();

異常處理 DistributedBarrier 會監控連線狀態,當連線斷掉時waitOnBarrier()方法會丟擲異常。

看一個例子:

package com.colobu.zkrecipe.barrier;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import
java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.retry.ExponentialBackoffRetry; import
org.apache.curator.test.TestingServer; public class DistributedBarrierExample { private static final int QTY = 5; private static final String PATH = "/examples/barrier"; public static void main(String[] args) throws Exception { try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); ExecutorService service = Executors.newFixedThreadPool(QTY); DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH); controlBarrier.setBarrier(); for (int i = 0; i < QTY; ++i) { final DistributedBarrier barrier = new DistributedBarrier(client, PATH); final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { Thread.sleep((long) (3 * Math.random())); System.out.println("Client #" + index + " waits on Barrier"); barrier.waitOnBarrier(); System.out.println("Client #" + index + " begins"); return null; } }; service.submit(task); } Thread.sleep(10000); System.out.println("all Barrier instances should wait the condition"); controlBarrier.removeBarrier(); service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } } }

這個例子建立了controlBarrier來設定柵欄和移除柵欄。 我們建立了5個執行緒,在此Barrier上等待。 最後移除柵欄後所有的執行緒才繼續執行。

如果你開始不設定柵欄,所有的執行緒就不會阻塞住。

雙柵欄Double Barrier

雙柵欄允許客戶端在計算的開始和結束時同步。當足夠的程序加入到雙柵欄時,程序開始計算, 當計算完成時,離開柵欄。 雙柵欄類是DistributedDoubleBarrier。 建構函式為:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成員數量,當enter方法被呼叫時,成員被阻塞,直到所有的成員都呼叫了enter。 當leave方法被呼叫時,它也阻塞呼叫執行緒, 知道所有的成員都呼叫了leave。 就像百米賽跑比賽, 發令槍響, 所有的運動員開始跑,等所有的運動員跑過終點線,比賽才結束。

DistributedBarrier 會監控連線狀態,當連線斷掉時enter()leave方法會丟擲異常。

例子程式碼:

package com.colobu.zkrecipe.barrier;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

public class DistributedBarrierExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {

                        Thread.sleep((long) (3 * Math.random()));
                        System.out.println("Client #" + index + " enters");
                        barrier.enter();
                        System.out.println("Client #" + index + " begins");
                        Thread.sleep((long) (3000 * Math.random()));
                        barrier.leave();
                        System.out.println("Client #" + index + " left");
                        return null;
                    }
                };
                service.submit(task);
            }


            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

        }

    }

}