1. 程式人生 > >zookeeper客戶端api操作

zookeeper客戶端api操作

這裡記錄zookeeper java客戶端api的使用。

客戶端建立Zookeeper例項,然後呼叫這個類提供的方法與zookeeper伺服器進行互動。 Zookeeper的建構函式有如下4種:

ZooKeeper(connectString, sessionTimeout, watcher);
ZooKeeper(connectString, sessionTimeout, watcher,canBeReadOnly);
ZooKeeper(connectString, sessionTimeout, watcher, sessionId, sessionPasswd);
ZooKeeper
(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, canBeReadOnly);

引數說明: connectString: 連線字串 例如 “127.0.0.1:2181” sessionTimeout: 會話超時時間 以毫秒為單位的整型值 在sessionTimeout時間內服務端與客戶端沒有有效的心跳檢測 則會話失效 watcher: 預設的事件通知處理器 sessionId: 會話ID sessionPasswd: 會話祕鑰 canBeReadOnly: 是否是隻讀

String create(String path, byte[] data, List acl,CreateMode createMode) 建立一個給定的目錄節點 path, 並給它設定資料,CreateMode 標識有四種形式的目錄節點,分別是 PERSISTENT:持久化目錄節點,這個目錄節點儲存的資料不會丟失;PERSISTENT_SEQUENTIAL:順序自動編號的目錄節點,這種目錄節點會根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名;EPHEMERAL:臨時目錄節點,一旦建立這個節點的客戶端與伺服器埠也就是 session 超時,這種節點會被自動刪除;EPHEMERAL_SEQUENTIAL:臨時自動編號節點

Stat exists(String path, boolean watch) 判斷某個 path 是否存在,並設定是否監控這個目錄節點,這裡的 watcher 是在建立 ZooKeeper 例項時指定的 watcher,exists方法還有一個過載方法,可以指定特定的watcher

Stat exists(String path,Watcher watcher) 過載方法,這裡給某個目錄節點設定特定的 watcher,Watcher 在 ZooKeeper 是一個核心功能,Watcher 可以監控目錄節點的資料變化以及子目錄的變化,一旦這些狀態發生變化,伺服器就會通知所有設定在這個目錄節點上的 Watcher,從而每個客戶端都很快知道它所關注的目錄節點的狀態發生變化,而做出相應的反應

void delete(String path, int version) 刪除 path 對應的目錄節點,version 為 -1 可以匹配任何版本,也就刪除了這個目錄節點所有資料

ListgetChildren(String path, boolean watch) 獲取指定 path 下的所有子目錄節點,同樣 getChildren方法也有一個過載方法可以設定特定的 watcher 監控子節點的狀態

Stat setData(String path, byte[] data, int version) 給 path 設定資料,可以指定這個資料的版本號,如果 version 為 -1 怎可以匹配任何版本

byte[] getData(String path, boolean watch, Stat stat) 獲取這個 path 對應的目錄節點儲存的資料,資料的版本等資訊可以通過 stat 來指定,同時還可以設定是否監控這個目錄節點資料的狀態

void addAuthInfo(String scheme, byte[] auth) 客戶端將自己的授權資訊提交給伺服器,伺服器將根據這個授權資訊驗證客戶端的訪問許可權。

Stat setACL(String path,List acl, int version) 給某個目錄節點重新設定訪問許可權,需要注意的是 Zookeeper 中的目錄節點許可權不具有傳遞性,父目錄節點的許可權不能傳遞給子目錄節點。目錄節點 ACL由兩部分組成:perms 和 id。Perms 有 ALL、READ、WRITE、CREATE、DELETE、ADMIN 幾種,而 id 標識了訪問目錄節點的身份列表,預設情況下有以下兩種:ANYONE_ID_UNSAFE = new Id(“world”, “anyone”) 和 AUTH_IDS = new Id(“auth”, “”) 分別表示任何人都可以訪問和建立者擁有訪問許可權。

List getACL(String path,Stat stat) 獲取某個目錄節點的訪問許可權列表

下面通過一個配置JDBC的url、username、password,並從zookeeper讀取的示例來說明 配置類:

/**
 * Created by j.tommy on 2017/9/8.
 */
public class ZkTest1 {
    private final static String ZK_CONNECTION_STR = "192.168.74.125:2181";
    private final static int SESSION_TIMEOUT = 30000;
    private final static String AUTH_TYPE = "digest";
    private final static String AUTH_ID_PWD = "admin:123456";
    private final static String ZK_ROOT = "/dbConf";
    private final static String ZK_URL = ZK_ROOT + "/url";
    private final static String ZK_USERNAME = ZK_ROOT + "/username";
    private final static String ZK_PWD = ZK_ROOT + "/password";
    private ZooKeeper zk = null;
    public ZkTest1() {
        getZK();
    }
    public ZooKeeper getZK() {
        try {
            zk = new ZooKeeper(ZK_CONNECTION_STR, SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println(watchedEvent);
                }
            });
            while (zk.getState() != ZooKeeper.States.CONNECTED) {
                Thread.sleep(1000L);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return zk;
    }
    private void createNode(String path,byte[] value,String idPassword) {
        // 指定認證模式為digest
        Id id = new Id(AUTH_TYPE,idPassword);
        // 建立的節點有所有許可權
        ACL acl = new ACL(ZooDefs.Perms.ALL,id);
        try {
            zk.create(path, value, Collections.singletonList(acl), CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private String getNodeValue(String path,String idPassword) {
        try {
            // 由於建立節點時指定了許可權,所以這裡必須設定許可權才能查詢
            // 注意:這裡auth是不用加密的。
            zk.addAuthInfo(AUTH_TYPE,AUTH_ID_PWD.getBytes());
            return new String(zk.getData(path,false,null));
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    private void closeZK() {
        if (null != zk) {
            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        ZkTest1 zkTest1 = new ZkTest1();
        try {
            String idPassword = DigestAuthenticationProvider.generateDigest(AUTH_ID_PWD);
            // 建立根節點
            zkTest1.createNode(ZK_ROOT,"abc".getBytes(),idPassword);
            String rootValue = zkTest1.getNodeValue(ZK_ROOT,idPassword);
            if (StringUtils.isNotEmpty(rootValue)) {
                System.out.println(ZK_ROOT + "節點建立成功!value=" + rootValue);
            }
            // 建立url節點
            zkTest1.createNode(ZK_URL,"10.10.1.19".getBytes(),idPassword);
            String urlValue = zkTest1.getNodeValue(ZK_URL,idPassword);
            if (StringUtils.isNotEmpty(urlValue)) {
                System.out.println(ZK_URL + "節點建立成功!value=" + urlValue);
            }
            // 建立username節點
            zkTest1.createNode(ZK_USERNAME,"root".getBytes(),idPassword);
            String usernameValue = zkTest1.getNodeValue(ZK_USERNAME,idPassword);
            if (StringUtils.isNotEmpty(urlValue)) {
                System.out.println(ZK_USERNAME + "節點建立成功!value=" + usernameValue);
            }
            // 建立password節點
            zkTest1.createNode(ZK_PWD,"123456".getBytes(),idPassword);
            String pwdValue = zkTest1.getNodeValue(ZK_PWD,idPassword);
            if (StringUtils.isNotEmpty(pwdValue)) {
                System.out.println(ZK_PWD + "節點建立成功!value=" + pwdValue);
            }
            zkTest1.closeZK();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
    }
}

讀取zookeeper配置的類

/**
 * Created by j.tommy on 2017/9/8.
 */
public class ZKTest2 {
    private final static String ZK_CONNECTION_STR = "192.168.74.125:2181";
    private final static int SESSION_TIMEOUT = 30000;
    private final static String ZK_ROOT = "/dbConf";
    private final static String ZK_URL_PATH = ZK_ROOT+"/url";
    private final static String ZK_USERNAME_PATH = ZK_ROOT+"/username";
    private final static String ZK_PASSWD_PATH = ZK_ROOT+"/password";
    private final static String AUTH_TYPE = "digest";
    private final static String AUTH_PASSWD = "admin:abc1";
    private String url;
    private String username;
    private String password;
    private ZooKeeper zk = null;
    private ZooKeeper getZK() throws IOException, InterruptedException {
        zk = new ZooKeeper(ZK_CONNECTION_STR,SESSION_TIMEOUT,new MyWatcher());
        while (zk.getState() != ZooKeeper.States.CONNECTED) {
            Thread.sleep(1000L);
        }
        zk.addAuthInfo(AUTH_TYPE,AUTH_PASSWD.getBytes());
        return zk;
    }
    private void getZKValue() {
        try {
            this.url = new String(zk.getData(ZK_URL_PATH,true,null));
            this.username = new String(zk.getData(ZK_USERNAME_PATH,true,null));
            this.password = new String(zk.getData(ZK_PASSWD_PATH,true,null));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    class MyWatcher implements Watcher {
        @Override
        public void process(WatchedEvent watchedEvent) {
            Event.EventType eventType = watchedEvent.getType();
            if (eventType == Event.EventType.None) {
                System.out.println("伺服器連線成功");
            }
            else if (eventType == Event.EventType.NodeCreated) {
                System.out.println("節點建立成功");
            }
            else if (eventType == Event.EventType.NodeDataChanged) {
                System.out.println("資料修改成功");
                getZKValue();
            }
            else if (eventType == Event.EventType.NodeDeleted) {
                System.out.println("節點刪除成功");
            }
            else if (eventType == Event.EventType.NodeChildrenChanged) {
                System.out.println("子節點資料修改成功");
                getZKValue();
            }
        }
    }
    public String getUrl() {
        return url;
    }
    public void setUrl(String url) {
        this.url = url;
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public static void main(String[] args) throws InterruptedException, IOException, NoSuchAlgorithmException {
        ZKTest2 zkTest2 = new ZKTest2();
        ZooKeeper zk = zkTest2.getZK();
        String auth = DigestAuthenticationProvider.generateDigest(AUTH_PASSWD);
        System.out.println("auth:" +auth);
        int loopCount = 10;
        int i=0;
        while (i < loopCount) {
            zkTest2.getZKValue();
            System.out.println("url:" + zkTest2.getUrl());
            System.out.println("username:" + zkTest2.getUsername());
            System.out.println("password:" + zkTest2.getPassword());
            System.out.println("--------------------------------------");
            i++;
            Thread.sleep(5000L);
        }
        zk.close();
    }
}

程式碼中,建立節點時使用的是digest認證,id和password為admin:123456,但讀取時為admin:abc1. 這個時候執行程式是會出錯的,如下: 提示沒有許可權,將讀取配置類中id和password修改為與建立節點的一樣(admin:123456),再次執行。

建立節點時id與password是要經過加密的,即DigestAuthenticationProvider.generateDigest(AUTH_PASSWD);但是驗證時不需要。 同時在zookeeper的客戶端中執行命令建立節點時,使用明文密碼時可以建立成功的,但沒法驗證成功。

zookeeper客戶端中,使用$ZOOKEEPER_HOME/bin/zkCli.sh -server zookeeper伺服器IP:埠連線zookeeper伺服器。 輸入help檢視所有命令: