1. 程式人生 > >ZooKeeper系列(9):ZooKeeper實現分布式Barrier和Queue

ZooKeeper系列(9):ZooKeeper實現分布式Barrier和Queue

nod zookeeper instant zook conf protected tint 說了 this

1. 快速開始

1.1概述:

Zookeeper是Hadoop的一個子項目,它是分布式系統中的協調系統,可提供的服務主要有:配置服務、名字服務、分布式同步、組服務等。
1.2 使用常見

1.2.1 統一配置

把配置放在ZooKeeper的節點中維護,當配置變更時,客戶端可以收到變更的通知,並應用最新的配置。

1.2.2,集群管理

集群中的節點,創建ephemeral的節點,一旦斷開連接,ephemeral的節點會消失,其它的集群機器可以收到消息。

1.2.3 分布式鎖

多個客戶端發起節點創建操作,只有一個客戶端創建成功,從而獲得鎖。

1.3 安裝和配置

通過官方下載鏈接zookeeper 進行下載,解壓後進入conf目錄,新建一個zoo.conf文件,配置內容如下:

tickTime=2000    
dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/data
dataLogDir=/Users/lsq/Documents/zookeeper/zookeeper0/dataLog
clientPort=4399
initLimit=5
syncLimit=2

tickTime: ZooKeeper基本時間單位(ms)
initLimit: 指定了啟動zookeeper時,zookeeper實例中的隨從實例同步到領導實例的初始化連接時間限制,超出時間限制則連接失敗(以tickTime為時間單位);
syncLimit: 指定了zookeeper正常運行時,主從節點之間同步數據的時間限制,若超過這個時間限制,那麽隨從實例將會被丟棄

dataDir: zookeeper存放數據的目錄;
clientPort: 用於連接客戶端的端口

接下來進入bin目錄啟動ZooKeeper實例以及客戶端連接:

./zkServer.sh start
./zkCli.sh -server localhost:4399

接下來看看集群如何配置,其實跟單機差不多,這裏我們把剛剛下載的Zookeeper復制多兩份,一共是三個,配置信息如下:

tickTime=2000    
dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/data
dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/dataLog
clientPort
=4399 initLimit=5 syncLimit=2 server.1=127.0.0.1:8880:9990 server.2=127.0.0.1:8881:9991 server.3=127.0.0.1:8882:9992

三個文件夾下面的zoo.conf都是這個格式,需要修改dataDir,dataDir,clientPort,
然後在dataDir所指向的目錄下面新建一個myid文件,對應server.x,比如第一個文件夾下面的myid就填入一個1,第二個就填入一個2,以此類推。接著依次啟動即可。可以采用下面的命令

echo "1" > myid

2.使用java來操作ZooKeeper實例
一門技術最重要的就算實戰了,接下來的內容將圍繞這一部分來講。
首先是Znode的創建和刪除
Znode有兩種類型:短暫的和持久的。短暫的znode在創建的客戶端與服務器端斷開(無論是明確的斷開還是故障斷開)連接時,該znode都會被刪除;相反,持久的znode則不會

技術分享圖片
public class CreateGroup implements Watcher {
    //會話延時
    private static final int SESSION_TIMEOUT = 1000;
    //zk對象
    private ZooKeeper zk = null;
    //同步計數器
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    //客戶端連接到服務器時會觸發觀察者進行調用
    public void process(WatchedEvent event) {
        if(event.getState() == KeeperState.SyncConnected){
            countDownLatch.countDown();//計數器減一
        }
    }

    public void connect(String hosts) throws IOException, InterruptedException {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        countDownLatch.await();//阻塞程序繼續執行
    }
    //創建GROUP
    public void create(String groupName) throws KeeperException, InterruptedException{
        String path = "/" + groupName;
        //允許任何客戶端對該znode進行讀寫,以及znode進行持久化
        String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("Created "+createPath);
    }
    //關閉zk
    public void close() throws InterruptedException{
        if(zk != null){
            try {
                zk.close();
            } catch (InterruptedException e) {
                throw e;
            }finally{
                zk = null;
                System.gc();
            }
        }
    }

    //測試主類
    public static void main(String args[]){
        String host = "127.0.0.1:4399";
        String groupName = "test";
        CreateGroup createGroup = new CreateGroup();
        try {
            createGroup.connect(host);
            createGroup.create(groupName);
            createGroup.close();
            createGroup = null;
            System.gc();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }   
    }
}
View Code

接下來把創建和銷毀分離出來作為一個獨立的類,以後相關操作可以直接使用

技術分享圖片
public class ConnetctionWatcher implements Watcher {

    private static final int SESSION_TIMEOUT = 5000;

    protected ZooKeeper zk = null;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void process(WatchedEvent event) {
        KeeperState state = event.getState();

        if(state == KeeperState.SyncConnected){
            countDownLatch.countDown();
        }
    }
    public void connection(String hosts) throws IOException, InterruptedException {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
        countDownLatch.await();
    }
    public void close() throws InterruptedException {
        if (null != zk) {
            try {
                zk.close();
            } catch (InterruptedException e) {
                throw e;
            }finally{
                zk = null;
                System.gc();
            }
        }
    }
}
View Code

接下來我們看看節點如何刪除

技術分享圖片
public class DeleteGroup extends ConnetctionWatcher {
    public void delete(String groupName) {
        String path = "/" + groupName;

        try {
            List<String> children = zk.getChildren(path, false);

            for(String child : children){
                zk.delete(path + "/" + child, -1);
            }
            zk.delete(path, -1);//版本號為-1,
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
View Code

3. 利用java實現分布式Barrier
Barrier是一種控制和協調多個任務觸發次序的機制。簡單來說就是用一個屏障把將要執行的任務攔住,等待所有任務都處於可運行狀態才放開屏障,其實在單機上我們可以利用CyclicBarrier來實現這個機制,但是在分布式環境下,我們可以利用ZooKeeper可以派上用場,我們可以利用一個Node來作為Barrier的實體,然後要Barrier的任務通過調用exists檢測是否Node存在,當需要打開Barrier時候,刪除這個Node,這樣ZooKeeper的watch機制會通知到各個任務可以開始執行。接下來看代碼:

技術分享圖片
public class Barrier extends SyncPrimitive {
    int size;
    String name;

    Barrier(String address, String root, int size) {
        super(address);
        this.root = root;
        this.size = size;
        //創建Barrier的Node
        if (zk != null) {
            try {
                Stat s = zk.exists(root, false);
                if (s == null) {
                    zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                System.out.println("Keeper exception when instantiating queue: " + e.toString());
            } catch (InterruptedException e) {
                System.out.println("Interrupted exception");
            }
        }
        try {
            name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
        } catch (UnknownHostException e) {
            System.out.println(e.toString());
        }

    }

    /**
     * 加入Barrier等待
     */

    boolean enter() throws KeeperException, InterruptedException{
        zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() < size) {
                    mutex.wait();
                } else {
                    return true;
                }
            }
        }
    }

    /**
     * 一直等待知道指定數量節點到達
     */

    boolean leave() throws KeeperException, InterruptedException{
        zk.delete(root + "/" + name, 0);
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                    if (list.size() > 0) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
    }
}
View Code

父類代碼如下

技術分享圖片
public class SyncPrimitive implements Watcher {
    static ZooKeeper zk = null;
    static Integer mutex;
    //根節點
    String root;
    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
        //else mutex = new Integer(-1);
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }

    public static void queueTest(String args[]) {
        Queue q = new Queue(args[1], "/app1");

        System.out.println("Input: " + args[1]);
        int i;
        Integer max = new Integer(args[2]);

        if (args[3].equals("p")) {
            System.out.println("Producer");
            for (i = 0; i < max; i++)
                try{
                    q.produce(10 + i);
                } catch (KeeperException e){

                } catch (InterruptedException e){

                }
        } else {
            System.out.println("Consumer");

            for (i = 0; i < max; i++) {
                try{
                    int r = q.consume();
                    System.out.println("Item: " + r);
                } catch (KeeperException e){
                    i--;
                } catch (InterruptedException e){

                }
            }
        }
    }

    public static void barrierTest(String args[]) {
        Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
        try{
            boolean flag = b.enter();
            System.out.println("Entered barrier: " + args[2]);
            if(!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }
        Random rand = new Random();
        int r = rand.nextInt(100);
        for (int i = 0; i < r; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {

            }
        }
        try{
            b.leave();
        } catch (KeeperException e){
        } catch (InterruptedException e){
        }
        System.out.println("Left barrier");
    }
    //測試用的主類
    public static void main(String args[]) {
        /*
        args =new String[] {"qTest","localhost:4399","3","c"};
        if (args[0].equals("qTest"))
            queueTest(args);
        else
            barrierTest(args);
         */
    }
}
View Code

4. 分布式隊列(Queue)
在分布式環境下,實現Queue需要高一致性來保證,那麽我們可以這樣來設計。把一個Node當成一個隊列,然後children用來存儲內容,利用ZooKeeper提供的順序遞增的模式(會自動在name後面加入一個遞增的數字來插入新元素)。於是在offer時候我們可以使用create,take時候按照順序把children第一個delete就可以了。ZooKeeper保證了各個server上數據是一致的。廢話不多說了,直接看代碼

技術分享圖片
/**
 * 一個消費者-生產者模式的消息隊列
 */
public class Queue extends SyncPrimitive {

    Queue(String address, String name) {
        super(address);
        this.root = name;
        if (zk != null) {
            try {
                Stat s = zk.exists(root, false);
                if (s == null) {
                    zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                System.out.println("Keeper exception when instantiating queue: " + e.toString());
            } catch (InterruptedException e) {
                System.out.println("Interrupted exception");
            }
        }
    }

    /**
     * 隊列中插入數據
     */

    boolean produce(int i) throws KeeperException, InterruptedException{
        ByteBuffer b = ByteBuffer.allocate(4);
        byte[] value;

        b.putInt(i);
        value = b.array();
        zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);

        return true;
    }


    /**
     * 把元素從隊列中移除
     */
    int consume() throws KeeperException, InterruptedException{
        int retvalue = -1;
        Stat stat = null;

        //得到現在隊列中首個可用的節點
        while (true) {
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, true);
                if (list.size() == 0) {
                    System.out.println("Going to wait");
                    mutex.wait();
                } else {
                    Integer min = new Integer(list.get(0).substring(7));
                    for(String s : list){
                        Integer tempValue = new Integer(s.substring(7));
                        //System.out.println("Temporary value: " + tempValue);
                        if(tempValue < min) min = tempValue;
                    }
                    System.out.println("Temporary value: " + root + "/element" + min);
                    byte[] b = zk.getData(root + "/element" + min, false, stat);
                    zk.delete(root + "/element" + min, 0);
                    ByteBuffer buffer = ByteBuffer.wrap(b);
                    retvalue = buffer.getInt();

                    return retvalue;
                }
            }
        }
    }
}
View Code

ZooKeeper系列(9):ZooKeeper實現分布式Barrier和Queue