1. 程式人生 > >Zookeeper的java客戶端API使用方法(五)

Zookeeper的java客戶端API使用方法(五)

前面幾篇博文,我們簡單的介紹了一下zookeeper,如何安裝zookeeper叢集,以及如何使用命令列等。這篇博文我們重點來看下Zookeeper的java客戶端API使用方式。

建立會話

客戶端可以通過建立一個Zookeeper(org.apache.zookeeper.ZooKeeper)例項來連線ZooKeeper伺服器。給大家推薦一篇博文,構造方法和引數都介紹的非常的詳細,看一下就好。

我們看一下建立會話的程式碼就好了。

public class CreateSession implements Watcher{

    private static ZooKeeper zookeeper;
    //建立一個與伺服器的連線 需要(服務端的 ip+埠號)(session過期時間)(Watcher監聽註冊)  
public static void main(String[] args) throws IOException, InterruptedException { zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession()); System.out.println(zookeeper.getState()); Thread.sleep(Integer.MAX_VALUE); } private void doSomething(){ System.out.println("do something"
); } @Override public void process(WatchedEvent event) { System.out.println("Receive watched event:" + event); if(event.getState() == KeeperState.SyncConnected){ System.out.println("ZooKeeper session established."); doSomething(); } } }

需要我們注意就一點:ZooKeeper 允許客戶端向服務端註冊一個 Watcher 監聽,當服務端的一些指定事件觸發了這個 Watcher,那麼就會向指定客戶端傳送一個事件通知來實現分散式的通知功能。

zookeeper的所有的API,都有同步和非同步兩種方式,使用非同步API時,client可為每個operation設定callback,在operation被執行後,zookeeper會執行對應的callback。

下面給大家展示一下兩個方式建立節點的不同。

同步建立節點

public class CreateNodeSync implements Watcher{

    private static ZooKeeper zookeeper;
     //建立一個與伺服器的連線 需要(服務端的 ip+埠號)(session過期時間)(Watcher監聽註冊) 
    public static void main(String[] args) throws IOException, InterruptedException {

        zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNodeSync());
        System.out.println(zookeeper.getState());
        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 建立Znode
      * CreateMode:
      *     PERSISTENT (持續的,相對於EPHEMERAL,不會隨著client的斷開而消失)
      *     PERSISTENT_SEQUENTIAL(持久的且帶順序的)
      *     EPHEMERAL (短暫的,生命週期依賴於client session)
      *     EPHEMERAL_SEQUENTIAL  (短暫的,帶順序的)
      */
    private void doSomething(){
        try {
            String path = zookeeper.create("/node_2", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("return path:" + path);

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("do something");
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("Receive watched event:" + event);
        if(event.getState() == KeeperState.SyncConnected){
            doSomething();
        }
    }
}

非同步建立節點

public class CreateNodeASync implements Watcher {

    private static ZooKeeper zookeeper;
     //建立一個與伺服器的連線 需要(服務端的 ip+埠號)(session過期時間)(Watcher監聽註冊) 
    public static void main(String[] args) throws IOException, InterruptedException {

        zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNodeASync());
        System.out.println(zookeeper.getState());
        Thread.sleep(Integer.MAX_VALUE);
    }
    private void doSomething() {
        zookeeper.create("/node_1", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
                new IStringCallback(), "this is content");

        System.out.println("do something");
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("Receive watched event:" + event);
        if (event.getState() == KeeperState.SyncConnected) {
            doSomething();
        }
    }

    static class IStringCallback implements AsyncCallback.StringCallback {

        @Override
        public void processResult(int rc, String path, Object ctx, String name) {

            System.out.println("rc:" + rc);
            System.out.println("path:" + path);
            System.out.println("ctx:" + ctx);
            System.out.println("name:" + name);

        }
    }
}

他們的執行結果就不再給大家展示了,篇幅會比較大,我對API對zookeeper節點增刪改查,都做了同步和非同步的編寫,大家可以在下面的連結地址中下載。

下載

這裡寫圖片描述

總結:

  • 在這裡api對zookeeper節點進行增刪改查,有同步和非同步的方
  • zookeeper不支援遞迴建立子節點(也就是說在父節點不存在的情況下,不允許建立子節點)
  • zookeeper不支援遞迴刪除(也就是說在父節點有子節點的情況下,不允許直接刪除父節點)

下篇博文,我們進行zookeeper的實戰部分。