1. 程式人生 > >zookeeper原生客戶端

zookeeper原生客戶端

一:zookeeper常用客戶端

  • zookeeper:官方提供的,原生的api,使用起來比較麻煩,比較底層,不夠直接,不建議使用。
  • zkclient: 對原生api的封裝,開源專案(https://github.com/adyliu/zkclient),dubbo中使用的是這個。
  • Apache Curator:Apache的開源專案,對原生客戶端zookeeper進行封裝,易於使用, 功能強大, 一般都是使用這個框架。

二:zookeeper原生客戶端

<dependency>
    <groupId>org.apache.zookeeper</groupId
>
<artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency>
public enum CreateMode {
     /** 持久節點 */
    PERSISTENT(0, false, false),
    /** 持久順序節點 */
    PERSISTENT_SEQUENTIAL(2, false, true),
    /** 臨時節點(本次會話有效,會話結束後會自動刪除) */
    EPHEMERAL(1, true
, false), /** 臨時順序節點 */ EPHEMERAL_SEQUENTIAL(3, true, true); }
public class ZooKeeper {
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException;

    // 不存在返回null
    public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException;

    /**
    * 重複建立同一個節點會拋異常
    * 建立子節點必選先保證父節點已經建立好了
    */
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException; public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException; public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException; public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException; /** * 只能刪除一個節點,不支援遞迴的刪除某個路徑 * 刪除的節點不能包含子節點 */ public void delete(String path, int version) throws InterruptedException, KeeperException; // 新增認證資訊(類似於密碼) public void addAuthInfo(String scheme, byte[] auth); // ACL(Access Control List)設定節點訪問許可權列表,每個節點都可以設定訪問許可權,指定只有特定的客戶端才能訪問和操作節點。 public Stat setACL(String path, List<ACL> acl, int aclVersion) throws KeeperException, InterruptedException; public List<ACL> getACL(String path, Stat stat) throws KeeperException, InterruptedException; public synchronized void close() throws InterruptedException; }
public class ZooDefs {
    public interface Ids {
        Id ANYONE_ID_UNSAFE = new Id("world", "anyone");
        Id AUTH_IDS = new Id("auth", "");

        /** 這是一個完全開放的許可權,所有客戶端都有許可權 */
        ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(31, ANYONE_ID_UNSAFE)));
        /** 只有建立節點的客戶端才有所有許可權 */
        ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(31, AUTH_IDS)));
        /** 所有客戶端只有讀取的 */
        ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(1, ANYONE_ID_UNSAFE)));
    }
}    

EventType

public static enum EventType {
    None(-1),
    NodeCreated(1),
    NodeDeleted(2),
    NodeDataChanged(3),
    NodeChildrenChanged(4);
}

KeeperState

public static enum KeeperState {
    Disconnected(0),
    SyncConnected(3),
    AuthFailed(4),
    ConnectedReadOnly(5),
    SaslAuthenticated(6),
    Expired(-112);
}

注意:程式不能debug,只能run,如果debug會報錯org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired for /app1
原因是debug造成程式停止執行,導致會話過期

public class ZookeeperTest {
    /** zookeeper地址 */
    static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    /** session超時時間 */
    static final int sessionTimeout = 5000;
    /** 阻塞程式執行,用於等待zookeeper連線成功,傳送成功訊號 */
    static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 事件狀態
                Event.KeeperState keeperState = watchedEvent.getState();
                if (Event.KeeperState.SyncConnected == keeperState) {
                    // 事件型別
                    Event.EventType eventType = watchedEvent.getType();
                    if (Event.EventType.None == eventType) {
                        // 連線成功建立,傳送訊號量,讓後續阻塞程式向下執行
                        countDownLatch.countDown();
                        System.out.println("連線成功:" + watchedEvent);
                    }
                }
            }
        };
        // 注意:ZooKeeper客戶端和伺服器會話的建立是一個非同步的過程,也就是說程式方法在處理完客戶端初始化後立即返回
        // 也就是說可能並沒有真正構建好一個可用的會話,只有會話宣告週期處於SyncConnected時才算真正建立好會話
        // 這也是為什麼要使用CountDownLatch來等待連線成功
        ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
        // 新增認證資訊
        // digest:是最常見的許可權控制模式,也更符合我們對許可權控制的認證,類似於使用者名稱和密碼的方式,zk會對形成的許可權標識先後進行兩次編碼處理,分別是SHA-1加密演算法、Base64編碼
        zooKeeper.addAuthInfo("digest", "123456".getBytes());
        // 如果沒有連線成功則進行阻塞,直到連線成功才繼續往下執行,連線時需要時間的,可能不會立即連線成功,肯能會等一兩秒之後才連線成功,
        countDownLatch.await();


        String app1Node = "/app1";
        try {
            Stat app1Stat = zooKeeper.exists(app1Node, false);
            if (app1Stat == null) {
                List<ACL> acls = new ArrayList<>(1);
                for (ACL acl : ZooDefs.Ids.CREATOR_ALL_ACL) {
                    acls.add(acl);
                }

                // 建立節點(如果節點已存在則丟擲異常),返回節點名稱
                // 該節點需要認證,只有認證的其它zk客戶端才能操作該節點
                String app1NodePath = zooKeeper.create(app1Node, "app1".getBytes(), acls, CreateMode.PERSISTENT);
                System.out.println("app1NodePath=" + app1NodePath);

                Stat p1Stat = zooKeeper.exists(app1Node + "/p_1", false);
                if (p1Stat == null) {
                    // 建立子節點(父節點必須存在,父節點不存在則拋異常)
                    // CreateMode.EPHEMERAL 表示臨時節點,當會話結束後會立即刪除
                    zooKeeper.create(app1Node + "/p_1", "p_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    zooKeeper.create(app1Node + "/p_2", "p_2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    zooKeeper.create(app1Node + "/p_3", "p_3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                }
            }


            byte[] data = zooKeeper.getData(app1Node, false, null);
            String app1NodeValue = new String(data);
            System.out.println("app1NodeValue=" + app1NodeValue);

            zooKeeper.setData(app1Node, "my p_1".getBytes(), -1);

            List<String> children = zooKeeper.getChildren(app1Node, false);
            for (String path : children) {
                String fullPath = app1Node + "/" + path;
                String nodeValue = new String(zooKeeper.getData(fullPath, false, null));
                System.out.println(fullPath + "=" + nodeValue);
            }

            // version版本號 -1: 全刪除
            zooKeeper.delete(app1Node + "/p_2", -1);

            // Thread.sleep(10000);
        } catch (Exception e) {
            System.out.println(e);
        } finally {
            zooKeeper.close();
        }
    }
}

這裡寫圖片描述

原生zookeeper客戶端使用注意:

  1. ZooKeeper建立連線時是非同步建立的,有可能已經開始呼叫客戶端方法了,連線還沒有完全建立好,所以在建立連線時一般將非同步建立客戶端變成同步建立客戶端

  2. session過期的問題: 在極端情況下,出現ZooKeeper session過期,客戶端需要自己去監聽該狀態並重新建立ZooKeeper例項

  3. 自動恢復(failover)的問題: 當client與一臺server的連線丟失,並試圖去連線另外一臺server時, client將回到初始連線模式

  4. 對可恢復異常的處理:當在server端建立一個有序ZNode,而在將節點名返回給客戶端時崩潰,此時client端丟擲可恢復的異常,使用者需要自己捕獲這些異常並進行重試

  5. 使用場景的問題:Zookeeper提供了一些標準的使用場景支援,但是ZooKeeper對這些功能的使用說明文件很少,而且很容易用錯.在一些極端場景下如何處理,zk並沒有給出詳細的文件說明.比如共享鎖服務,當伺服器端建立臨時順序節點成功,但是在客戶端接收到節點名之前掛掉了,如果不能很好的處理這種情況,將導致死鎖

三:Watcher示例

Watcher:監控者,監控某個節點對應的操作,也就是對某個節點操作時執行一下回調函式,Watcher#process()

如果某個動作(操作節點)需要進行監控需要設定某個動作的watch=true或者設定exists(path, true)。注意執行一次動作和一次watch繫結,如果想每次都需要監控,那麼每次動作都必須線上設定監控

public class ZookeeperWatcher implements Watcher {
    /** zookeeper地址 */
    private static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    /** session超時時間 */
    private static final int sessionTimeout = 5000;
    /** 阻塞程式執行,用於等待zookeeper連線成功,傳送成功訊號 */
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    private AtomicInteger seq = new AtomicInteger();

    private ZooKeeper zooKeeper;

    private static final String PARENT_PATH = "/p";
    private static final String CHILDEREN_PATH = "/p/c";

    public void createConnection(String connectString, int sessionTimeout){
        try {
            this.releaseConnection();
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, this);
            countDownLatch.await();
        } catch (Exception e) {

        }
    }

    public void releaseConnection() {
        if (this.zooKeeper == null) return;

        try {
            this.zooKeeper.close();
            System.out.println("斷開連線");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public boolean createPath(String path, String data, boolean needWatch) {
        try {
            // 設定監控(由於zookeeper的監控都是一次性的,所以每次都需要重新設定監控)
            System.out.println("新增節點");
            this.zooKeeper.exists(path, needWatch);
            this.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("[" + Thread.currentThread().getName() +  "] 節點建立成功 path=" + path + ", content=" + data);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }

    public void writeData(String path, String data) {
        System.out.println("修改節點");
        try {
            this.zooKeeper.setData(path, data.getBytes(), -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public List<String> getChildren(String path, boolean needWatch) {
        try {
            System.out.println("獲取子節點");
            return this.zooKeeper.getChildren(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public void deleteAllTestPath(boolean needWatch) {
        if (this.reSetWatch(CHILDEREN_PATH) != null) {
            this.deleteNode(CHILDEREN_PATH);
        }

        if (this.reSetWatch(PARENT_PATH) != null) {
            this.deleteNode(PARENT_PATH);
        }
    }

    public void deleteNode(String path) {
        try {
            System.out.println("刪除節點:" + path);
            this.zooKeeper.delete(path, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public Stat reSetWatch(String path){
        try {
            // 重新設定監控
            return this.zooKeeper.exists(path, true);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("process watchedEvent=" + watchedEvent);
        if (watchedEvent == null) return;
        Event.KeeperState keeperState = watchedEvent.getState();
        Event.EventType eventType = watchedEvent.getType();
        String path = watchedEvent.getPath();

        String prefix = "[Watcher(" + Thread.currentThread().getName() + ") -" + this.seq.incrementAndGet() + "] path=" + path + "\t";
        System.out.println(prefix + "連線狀態:" + keeperState);
        System.out.println(prefix + "事件:" + eventType);

        if (Event.KeeperState.SyncConnected == keeperState) {
            if (Event.EventType.None == eventType) {
                System.out.println(prefix + "成功連線上ZK伺服器 》》》》》");
                countDownLatch.countDown();
            } else if (Event.EventType.NodeCreated == eventType) {
                System.out.println(prefix + "建立節點");
                this.sleep(100);
            } else if (Event.EventType.NodeDataChanged == eventType) {
                System.out.println(prefix + " 修改節點");
                this.sleep(100);
            } else if (Event.EventType.NodeChildrenChanged == eventType) {
                System.out.println(prefix + " 修改子節點");
                this.sleep(3000);
            } else if (Event.EventType.NodeDeleted == eventType) {
                System.out.println(prefix + " 刪除子節點 " + path);
                this.sleep(3000);
            }
        } else if (Event.KeeperState.Disconnected == keeperState) {
            System.out.println(prefix + " 連線斷開 XXXXX");
        } else if (Event.KeeperState.AuthFailed == keeperState) {
            System.out.println(prefix + " 許可權檢查失敗");
        } else if (Event.KeeperState.Expired == keeperState) {
            System.out.println(prefix + " 會話過期");
        }

        System.out.println("--------------------------------------------------------------------");
    }

    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        ZookeeperWatcher zkWatcher = new ZookeeperWatcher();
        // 第一次觸發watcher 建立連線會觸發
        zkWatcher.createConnection(connectString, sessionTimeout);
        Thread.sleep(1000);

        // true:表示需要監控, 會觸發第二次watcher
        if (zkWatcher.createPath(PARENT_PATH, System.currentTimeMillis() + "", true)) {
            Thread.sleep(1000);

            // 重新建立監控(每次監控完就會斷開,下次要想還監控必須手動再次設定監控)
            zkWatcher.reSetWatch(PARENT_PATH);

            // 第三次觸發監控(因監控是一次性的,createPath監控用完了就斷開監控了,而上面又再次建立了監控,所以writeData能被監控到)
            zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "");

            // 監控父節點(當下面的程式碼執行時,父節點增加了一個子節點也屬於父節點發生了變化)
            zkWatcher.getChildren(PARENT_PATH, true);

            // 監控子節點,會觸發監控
            zkWatcher.createPath(CHILDEREN_PATH, System.currentTimeMillis() + "", true);

            // 觸發兩次監控
            zkWatcher.deleteAllTestPath(true);

        }

        zkWatcher.releaseConnection();

        Thread.sleep(10000);
    }
}

這裡寫圖片描述