1. 程式人生 > >zookeeper java api(1)

zookeeper java api(1)

1 Zookeeper安裝以及啟動

    這裡我已經進行了安裝,並且啟動了Zookeeper。埠是2182

2 Zookeeper config

tickTime=2000

initLimit=10

syncLimit=5

dataDir=D://zookiper/zookeeper/data

clientPort=2182

    引數介紹

tickTime:  這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。

dataDir:    顧名思義就是 Zookeeper 儲存資料的目錄,預設情況下,Zookeeper 將寫資料的日誌檔案也儲存在這個目錄裡。

clientPort:這個埠就是客戶端連線 Zookeeper 伺服器的埠,Zookeeper 會監聽這個埠,接受客戶端的訪問請求。

initLimit:   這個配置項是用來配置 Zookeeper 接受客戶端(這裡所說的客戶端不是使用者連線 Zookeeper 伺服器的客戶端,而是 Zookeeper 伺服器 群中連線到 Leader 的 Follower 伺服器)初始化連線時最長能忍受多少個心跳時間間隔數。當已經超過 10 個心跳的時間(也就是tickTime)長度後 Zookeeper 伺服器還沒有收到客戶端的返回資訊,那麼表明這個客戶端連線失敗。總的時間長度就是 5*2000=10 秒

syncLimit:這個配置項標識 Leader 與 Follower 之間傳送訊息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是 2*2000=4 秒

連線zk伺服器

    依賴的pom

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.3-beta</version>
            <type>pom</type>
        </dependency>
    public static ZooKeeper zkClient(){
        
        final String connectString="127.0.0.1:2182";
        final int sessionTimeout=5000;
        ZooKeeper zooKeeper=null;
        try {
             zooKeeper=new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(final WatchedEvent event) {
                    /** 判斷是否和伺服器之間取得了連線*/
                    if(event.getState()==Event.KeeperState.SyncConnected){
                        System.out.println("已經觸發了" + event.getType() + "事件!");
                    }
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        return zooKeeper;
    }

    在上述中,connectString,sessionTimeout表示連線伺服器的地址,以及客戶端連線伺服器端的session超時時間。watcher為監視器。zookeeper api和伺服器之間的連線是非同步的。當執行 ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, watcher) 這句程式碼之後,會立馬返回zkclient;當與伺服器建立好連線之後會 ,呼叫Watcher中的process方法進行處理。 process方法會接受一個WatchedEvent型別的引數,用於表明發生了什麼事件。

    下屬程式碼塊可以作為連線zk的模板。

public void process(WatchedEvent watchedEvent) {
    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //判斷是否已連線
        if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
            // 最初與zk伺服器建立好連線
        } else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
            // 子節點變化事件
        }
        // ...還可以繼續監聽其它事件型別
    }
    System.out.println(watchedEvent.getState());
}

WatchedEvent包含兩方面重要資訊:

    1 與zk伺服器連線的狀態資訊

    可以呼叫watchedEvent.getState()方法獲取與zk伺服器連線的狀態資訊,狀態資訊取值主要包括SyncConnectedDisconnectedConnectedReadOnlyAuthFailed等等。

    2 發生的具體事件型別資訊
watchedEvent.getState()方法只是獲取與zk伺服器連線的狀態資訊,但在同一個連線狀態下,還會發生很多事件的型別。例如在zk中,我們可以watch一個節點的資料內容,當這個節點的資料被改變時,我們可以獲取到這個事件。類似的還有子節點列表變化事件等等。這就需要我們在SyncConnected同一種連線狀態下區分多個事件型別。可以通過watchedEvent.getType()方法獲取具體的事件型別。

    事件型別的取值包括NoneNodeCreatedNodeDeletedNodeDataChangedNodeChildrenChanged

4 建立節點

    下面要介紹的每種api操作都可以分為兩種型別——同步和非同步。同步操作一般會有返回值,並且會丟擲相應的異常。非同步操作沒有返回值,也不會丟擲異常。此外非同步方法引數在同步方法引數的基礎上,會增加Callback和context兩個引數。如用同步方式建立一個節點的的程式碼如下:

  •     建立同步節點
public static void createNodeSync() throws KeeperException, InterruptedException {
        String path = "/poype_node";
        ZooKeeper zooKeeper = zkClient();
        String nodePath = zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(nodePath);
}

看ZkClient建立節點完畢:

關於上述的GUI工具,可以參見這篇文章:https://www.cnblogs.com/easyworld/p/8463910.html

同步下建立節點的方法create(*)介紹

public String create(final String path, byte data[], 
                           List<ACL> acl,CreateMode createMode) 
                    throws KeeperException, InterruptedException

引數介紹:

  • path:建立節點的路徑
  • data[] : 建立節點的資料值,引數型別是位元組陣列
  • acl:節點的訪問許可權,我們這裡指定該節點可以被任何人訪問

createMode:create命令可以有-s和-e兩個引數,其中-s是順序節點,-e是臨時節點。這裡的CreateMode就是這兩個引數的組合。可選的值:

PERSISTENT
永久節點
PERSISTENT_SEQUENTIAL
永久有序節點
EPHEMERAL
臨時節點
EPHEMERAL_SEQUENTIAL
臨時有序節點

 

  •     建立非同步節點

    非同步模式方法沒有返回值,並且不會丟擲任何異常:除了同步create方法中的四個引數以外,非同步模式的create方法還增加了callback和context兩個引數。StringCallback介面中的processResult方法會在節點建立好之後被呼叫,它有四個引數。第一個是int型別的resultCode,作為建立節點的結果碼,當成功建立節點時,resultCode的值為0。第二個引數是建立節點的路徑。第三個引數是context,當一個StringCallback型別物件作為多個create方法的引數時,這個引數就很有用了。第四個引數是建立節點的名字,其實與path引數相同。

    在我用其中的一種方式,怎麼也建立不了節點,就是如下的程式碼:

 public static ZooKeeper zkClient(){
        /** connectString  ZK連線地址
         *  sessionTimeout 回話超時時間 單位ms
         *  ************************************************
         *  watcher        監視器
         *  關於監視器:zookeeper api與伺服器建立連線的過程是非同步的。
         *  ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, watcher);
         *  上面的呼叫會馬上從ZooKeeper建構函式返回,當與伺服器建立好連線之後會
         *      呼叫Watcher中的process方法進行處理。
         *  process方法會接受一個WatchedEvent型別的引數,用於表明發生了什麼事件。*/
        final String connectString="127.0.0.1:2182";
        final int sessionTimeout=5000;
        ZooKeeper zooKeeper=null;
        try {
             zooKeeper=new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(final WatchedEvent watchedEvent) {
                    /** 判斷是否和伺服器之間取得了連線*/
                    if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
                        System.out.println("已經觸發了" + watchedEvent.getType() + "事件!");
                    }
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        }
        return zooKeeper;
    }
private static void createNodeAsync() {
        String path = "/poype_node2";
        ZooKeeper zooKeeper = zkClient();
        zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
                new MyStringCallBack(), "create");
    }
    static class MyStringCallBack implements AsyncCallback.StringCallback{
        @Override
        public void processResult(final int rc, final String path, final Object ctx, final String name) {
            System.out.println("非同步建立回撥結果:狀態:" + rc +";建立路徑:" +
                    path + ";傳遞資訊:" + ctx + ";實際節點名稱:" + name);
        }
    }

最後採用另外一種方式建立了節點:

  • 同步方式獲取節點資料
public class CreateNode implements Watcher {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2182", 5000, new CreateNode());
        countDownLatch.await();
        /** 非同步建立臨時節點*/
        zooKeeper.create("/poype_node2", "abc".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new MyStringCallBack(), "我是傳遞內容");
        /**  驗證等待回撥結果使用,可根據實際情況自行調整*/
        Thread.sleep(10000);
    }

    @Override
    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            countDownLatch.countDown();
        }
    }
    static class MyStringCallBack implements AsyncCallback.StringCallback {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("非同步建立回撥結果:狀態:" + rc +";建立路徑:" +
                    path + ";傳遞資訊:" + ctx + ";實際節點名稱:" + name);
        }
    }
}

zooKeeper.getData方法的返回值就是節點中儲存的資料值,它有三個引數,第一個引數是節點的路徑,用於表示要獲取哪個節點中的資料。第三個引數stat用於儲存節點的狀態資訊,在呼叫getData方法前,會先構造一個空的Stat型別物件作為引數傳給getData方法,當getData方法呼叫返回後,節點的狀態資訊會被填充到stat物件中。
第二個引數是一個bool型別的watch,這個引數比較重要。當watch為true時,表示我們想要監控這個節點的資料變化。當節點的資料發生變化時,我們就可以拿到zk伺服器推送給我們的通知。在process方法中會有類似下面的程式碼:

public void process(WatchedEvent watchedEvent) {
    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //與zk伺服器處於連線狀態
        if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
            createNodeAsync();
        } else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
            // 節點的子節點列表發生變化
        } else if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
            // 節點的資料內容發生變化
        }
    }
}

當節點的資料內容發生變化時,我們就會接收到NodeDataChanged這個事件。值得注意的是,zooKeeper設定的監聽只生效一次,如果在接收到NodeDataChanged事件後還想繼續對該節點的資料內容改變進行監聽,需要在事件處理邏輯中重新呼叫getData方法並將watch引數設定為true。
非同步獲取一個節點資料值的程式碼如下:

  • 非同步方式獲取節點資料
ublic class CreateNode implements Watcher {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2182", 5000, new CreateNode());
        countDownLatch.await();
    
        zooKeeper.getData("/poype_node2",true,new MyStringCallBackData(),"非同步獲取節點的資料值");
        Thread.sleep(10000);

    }

    @Override
    public void process(WatchedEvent event) {
        if (Event.KeeperState.SyncConnected == event.getState()) {
            countDownLatch.countDown();
        }
    }
   
    static class MyStringCallBackData implements AsyncCallback.DataCallback{

        @Override
        public void processResult(final int rc, final String path, final Object ctx,
                final byte[] data, final Stat stat) {
            System.out.println(rc);
            System.out.println(path);
            System.out.println(ctx);
            System.out.println("***********");
            System.out.println(new String(data));
            System.out.println(stat);
        }
    }
}

參考:https://segmentfault.com/a/1190000012262940

        :  https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/index.html

        :  https://www.cnblogs.com/easyworld/p/8463910.html

        :https://blog.csdn.net/wo541075754/article/details/65625481