1. 程式人生 > >zookeeper使用(二):javaAPI基本操作和迴圈監聽器使用

zookeeper使用(二):javaAPI基本操作和迴圈監聽器使用

本文介紹如何連線zookeeper叢集、對zookeeper資料的crud、以及迴圈監聽器的使用

引入pom依賴

引入的zookeeper版本依賴應該與安裝的zookeeper版本一致。注,需要註釋掉type標籤

    <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId
>
zookeeper</artifactId> <version>3.4.9</version> <!--<type>pom</type>--> </dependency>

CRUD測試

針對crud命令做測試的幾個方法

package com.mym.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import
org.junit.Test; import java.io.IOException; import java.util.List; public class ZookeeperTest { ZooKeeper zkClient = null; String createNodeName = "/apiTest10000000007"; //記錄建立的節點名稱 /** * 建立連線 * @throws IOException */ @Before public void connect() throws IOException { if
(zkClient == null){ zkClient = new ZooKeeper("192.168.31.201:2181,192.168.31.202:2181,192.168.31.203:2181", 1000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("success to connect zk cluster!"); } }); } } /** * 測試建立節點 * @throws KeeperException * @throws InterruptedException */ @Test public void testCreate() throws KeeperException, InterruptedException { createNodeName = zkClient.create("/apiTest1", "this is api test1 data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println("建立節點的返回值是:"+createNodeName); } /** * ls命令 * @throws KeeperException * @throws InterruptedException */ @Test public void testLs() throws KeeperException, InterruptedException { List<String> children = zkClient.getChildren("/", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("獲得監聽事件,path:" + watchedEvent.getPath() + ";state" + watchedEvent.getState() + ";type:" + watchedEvent.getType()); } }); for (int i = 0; i < children.size(); i++) { System.out.println("ls / 資料:"+children.get(i)); } } /** * set命令 * @throws KeeperException * @throws InterruptedException */ @Test public void testSet() throws KeeperException, InterruptedException { Stat stat = zkClient.setData(createNodeName, ("this is update data! "+System.currentTimeMillis()).getBytes(), -1);//-1表示讓系統維護version System.out.println("set 返回資料::"+stat); } /** * del命令 * @throws KeeperException * @throws InterruptedException */ @Test public void testDel() throws KeeperException, InterruptedException { zkClient.delete(createNodeName,-1); System.out.println("success to del "+createNodeName+" Znode!"); } /** * get * @throws KeeperException * @throws InterruptedException */ @Test public void testGet() throws KeeperException, InterruptedException { byte[] data = zkClient.getData(createNodeName, false, null); System.out.println("節點"+createNodeName+"的資料是:"+new String(data)); } /** * 測試watch * @throws KeeperException * @throws InterruptedException */ @Test public void testWatch() throws KeeperException, InterruptedException { //先重置zkClient的watch物件(也可以在例項化zkClient時指定) zkClient.register(watcher); //進行監聽 byte[] data = zkClient.getData(createNodeName, true, null); System.out.println("獲得節點"+createNodeName+"的資料是:"+new String(data)); //第一次進行修改,觸發watch this.testSet(); //第二次進行修改,觸發watch this.testSet(); } /** * 關閉連線 * @throws InterruptedException */ @After public void close() throws InterruptedException { zkClient.close(); } /**定義watch物件*/ private Watcher watcher = new Watcher() { public int watchCount = 0; //記錄監聽次數 @Override public void process(WatchedEvent watchedEvent) { System.out.println("獲得監聽事件,path:" + watchedEvent.getPath() + ";state:" + watchedEvent.getState() + ";type:" + watchedEvent.getType()); //迴圈重複監聽 try { zkClient.exists(watchedEvent.getPath(), true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } watchCount++; System.out.println("第 "+watchCount+" 次監聽到!"); } }; }

測試結果如下

  • 連線上叢集時
success to connect zk cluster!
  • ls
    類命令輸入:ls / 對應方法testLs()
ls / 資料:watchTest
ls / 資料:shunxu20000000001
ls / 資料:shunxu10000000000
ls / 資料:zookeeper
ls / 資料:a
  • create
    類命令輸入:create /apiTest1 'this is api test1 data' 對應方法testCreate
建立節點的返回值是:/apiTest10000000007
  • get
    類命令輸入:get /名稱 對應方法testGet()
節點/apiTest10000000007的資料是:this is api test1 data
  • set
    類命令輸入:set /名稱 資料 對應方法testSet()
set 返回資料::4294967336,4294967345,1526852099431,1526852869041,1,0,0,0,20,0,4294967336

注,這些返回的資料就是節點的所有屬性,可以進去看下Stat類的定義:

public class Stat implements Record {
    private long czxid;
    private long mzxid;
    private long ctime;
    private long mtime;
    private int version;
    private int cversion;
    private int aversion;
    private long ephemeralOwner;
    private int dataLength;
    private int numChildren;
    private long pzxid;

    ....
  • delete
    類命令輸入:delete /名稱 對應方法testDel()
success to del /apiTest10000000007 Znode!

測試Watch(迴圈監聽器)

邏輯就是對某個路徑在本次監聽完後主動用簡單(exist()方法)的呼叫再次註冊監聽

具體方法:

(注,這只是一個測試類,故直接把Watch匿名類物件直接例項化,不太友好,實際使用時應通過實現Watch介面,創造一個Watch實現來,從而獲得watcher物件來使用)

   /**
     * 測試watch
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void testWatch() throws KeeperException, InterruptedException {

       //先重置zkClient的watch物件(也可以在例項化zkClient時指定)
        zkClient.register(watcher);
        //進行監聽
        byte[] data = zkClient.getData(createNodeName, true, null);
        System.out.println("獲得節點"+createNodeName+"的資料是:"+new String(data));

        //第一次進行修改,觸發watch
        this.testSet();

        //第二次進行修改,觸發watch
        this.testSet();
    }


    /**定義watch物件*/
    private Watcher watcher = new Watcher() {
        public int watchCount = 0;  //記錄監聽次數

        @Override
        public void process(WatchedEvent watchedEvent) {
            System.out.println("獲得監聽事件,path:" + watchedEvent.getPath() + ";state:" + watchedEvent.getState() + ";type:" + watchedEvent.getType());

            //迴圈重複監聽
            try {
                zkClient.exists(watchedEvent.getPath(), true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            watchCount++;
            System.out.println("第 "+watchCount+" 次監聽到!");
        }
    };

測試結果:

獲得監聽事件,path:null;state:SyncConnected;typeNone
獲得節點/apiTest10000000007的資料是:this is update data! 1532258475593
獲得監聽事件,path:/apiTest10000000007;state:SyncConnected;typeNodeDataChanged
set 返回資料::4294967352,4294967398,1526853419126,1526855521206,21,0,0,0,34,0,42949673521 次監聽到!
獲得監聽事件,path:/apiTest10000000007;state:SyncConnected;typeNodeDataChanged
set 返回資料::4294967352,4294967399,1526853419126,1526855521216,22,0,0,0,34,0,42949673522 次監聽到!

注:第一個 獲得監聽事件 是客戶端連線伺服器的自動監聽。