1. 程式人生 > >Zookeeper分散式及客戶端Curator的API簡單使用

Zookeeper分散式及客戶端Curator的API簡單使用

最近公司專案中使用了分散式Zookeeper及Dubbo,為了弄清楚這些框架在專案中的使用,在我業餘時間中學習了一些Zookeeper的簡單用法,分享出來,若有不足之處,望大家給與建議......

一、什麼是分散式系統?

我的理解:將原有的系統拆分為多個子系統組成一個龐大的系統,這個龐大系統對使用者不透明;可以分為3點介紹:

  1. 很多臺計算機組成一個整體,一個整體一致對外並且處理同一請求;
  2. 內部的每臺計算機都可以相互通訊(request,response);
  3. 客戶端到伺服器的一次請求到響應結束會經歷多臺計算機。

二、那什麼是Zookeeper呢?Zookeeper在分散式中有什麼作用呢?有什麼優勢?

Zookeeper是一個高可用的分散式資料管理與系統協調框架,主要是用來維護和監控儲存資料的狀態變化,通過監控這些資料狀態的變化,從而達到基於資料的叢集管理;可分為點介紹:

      1.釋出與訂閱模型,Zookeeper可以將釋出節點上的資料提供給訂閱者,供訂閱者獲取,實現配置資訊的管理和更新;

      2.負載均衡,在分散式環境中,為了保證高可用性,通常同一個應用或同一個服務的提供方都會部署多份,達到對等服務。而消費者就需要在這些對等的伺服器中選擇一個來執行相關的業務邏輯,其中比較典型的是訊息中介軟體中的生產者,消費者負載均衡;Zookeeper通常來做到生產者、消費者的負載均衡;

      3.命令服務,Zookeeper中,客戶端可根據指定的名字來獲取資源或服務地址,提供者資訊等;

      4.分散式鎖,Zookeeper可以通過分散式鎖來保證資料的強一致性。(兩類:保持獨佔鎖,控制時序鎖)

特性:

  1. 一致性:資料一致性,資料按照順序分批入庫;
  2. 原子性:事務要麼成功要麼失敗,不會區域性化;
  3. 單一檢視:客戶端連線叢集中的任一ZK節點,資料都是一致的;
  4. 可靠性:每次對ZK的操作都會儲存在服務端;
  5. 實時性:客戶端可以讀取到ZK服務端的最新資料。

三、Zookeeper客戶端Curator簡單API詳解:

(1)例項化Zookeeper客戶端:

/**
 * 例項化Zookeeper客戶端Curator例項
 */
public class CuratorOperator {

    public CuratorFramework client = null;
    public static final String zkServerPath = "192.168.43.190:2181";

    /**
     * 例項化zk客戶端
     */
    public CuratorOperator() {
        /**
         * 同步建立zk示例,原生api是非同步的
         *
         * curator連結zookeeper的策略:ExponentialBackoffRetry
         * baseSleepTimeMs:初始sleep的時間
         * maxRetries:最大重試次數
         * maxSleepMs:最大重試時間
         */
//		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

        /**
         * curator連結zookeeper的策略:RetryNTimes
         * n:重試的次數
         * sleepMsBetweenRetries:每次重試間隔的時間
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

        /**
         * curator連結zookeeper的策略:RetryOneTime
         * sleepMsBetweenRetry:每次重試間隔的時間
         */
//		RetryPolicy retryPolicy2 = new RetryOneTime(3000);

        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();    //在其workspace命令空間中操作ZK中節點
        client.start();
    }

    /**
     * @Description: 關閉zk客戶端連線
     */
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }
}

(2) 建立節點:

    /**
     * 建立節點
     * @throws Exception
     */
    public void createZKNode() throws Exception{
        // 例項化
        CuratorOperator cto = new CuratorOperator();
        boolean isZkCuratorStarted = cto.client.isStarted();
        System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

        // 建立節點(creatingParentsIfNeeded()可以遞迴建立節點)
        String nodePath = "/super/directory";
        byte[] data = "superme".getBytes();
        cto.client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(nodePath, data);
    }

(3)更新刪除節點:

    /**
     * 更新刪除節點
     * @throws Exception
     */
    public void updateAndDeleteZKNode() throws Exception{
        // 例項化
        CuratorOperator cto = new CuratorOperator();
        boolean isZkCuratorStarted = cto.client.isStarted();
        System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
        String nodePath = "/super/directory";

        //更新節點資料
        byte[] newData = "newData".getBytes();
        cto.client.setData().withVersion(1).forPath(nodePath, newData);

        // 刪除節點
        cto.client.delete()
                .guaranteed()					// 如果刪除失敗,那麼在後端還是繼續會刪除,直到成功
                .deletingChildrenIfNeeded()	// 如果有子節點,遞迴刪除(原生API是不支援的)
                .withVersion(2)
                .forPath(nodePath);
    }

(4)讀取節點及子節點的內容:

    /**
     * 獲取當前節點及子節點內容
     */
    public void getParentAndChildNodeContent() throws Exception{
        // 例項化
        CuratorOperator cto = new CuratorOperator();
        boolean isZkCuratorStarted = cto.client.isStarted();
        System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
        String nodePath = "/super/directory";

        // 判斷節點是否存在,如果不存在則為空
        Stat statExist = cto.client.checkExists().forPath(nodePath);
        System.out.println(statExist);

        // 讀取節點資料
        Stat stat = new Stat();
        //storingStatIn(stat)即為獲取節點資訊的同時獲取節點的狀態資訊並存儲
        byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("節點" + nodePath + "的資料為: " + new String(data));
        System.out.println("該節點的版本號為: " + stat.getVersion());


        // 查詢當前路徑下的子節點(而不是遞迴子節點,只是子節點的第一層)
        List<String> childNodes = cto.client.getChildren().forPath(nodePath);
        System.out.println("開始列印子節點:");
        childNodes.stream().forEach(System.out::println);
    }

(5) 只能對其ZK節點修改進行多次監控:

    /**
     * 只能對其節點的修改進行多次監控
     * @throws Exception
     */
    public void watchUpdateZKNode() throws Exception{
        // 例項化
        CuratorOperator cto = new CuratorOperator();
        boolean isZkCuratorStarted = cto.client.isStarted();
        System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
        String nodePath = "/super/directory";

        /**
         *  為節點新增watcher
         *  當客戶端和服務端建立連線的時候,會將服務端zk中當前路徑下的節點資料快取到本地 NodeCache 中
         *  NodeCache: 建立一個快取節點,監聽資料節點的變更,會觸發事件
         */
        final NodeCache nodeCache = new NodeCache(cto.client, nodePath);
        // buildInitial : 初始化的時候獲取該路徑下node的值並且快取到本地
        nodeCache.start(true);
        if (nodeCache.getCurrentData() != null) {
            System.out.println("節點初始化資料為:" + new String(nodeCache.getCurrentData().getData()));
        } else {
            System.out.println("節點初始化資料為空...");
        }

        /**
         * 這樣就可以對其節點改變進行多次監聽
         */
        nodeCache.getListenable().addListener(() -> {
                    //先要進行nodeCache節點判空操作,防止該節點已被刪除過後進行其他操作,報空指標異常
                    if (nodeCache.getCurrentData() == null) {
                        System.out.println("空");
                        return;
                    }
                    String data = new String(nodeCache.getCurrentData().getData());
                    System.out.println("節點路徑:" + nodeCache.getCurrentData().getPath() + "資料:" + data);
                }
        );
    }

(6)對其ZK節點的增刪改都可進行多次監控:

   public final static String ADD_PATH = "/super/directory/child1";

    /**
     * 可以多次對其ZK節點增刪改進行監控
     */
    public void multiWatchZkNodeADUNode() throws Exception{
        // 例項化
        CuratorOperator cto = new CuratorOperator();
        boolean isZkCuratorStarted = cto.client.isStarted();
        System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
        String nodePath = "/super/directory";

        /**
         *  若要監聽的節點是修改、刪除、新增操作,nodeCacheListener是不支援這種操作的;
         *  可以使用PathChildrenCache進行當前節點的父節點進行監聽,可達到同樣的效果;
         *  為子節點新增watcher,PathChildrenCache: 監聽資料節點的增刪改,會觸發事件
         */
        String childNodePathCache = nodePath;
        // cacheData: 設定快取節點的資料狀態
        final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);
        /**
         * StartMode: 初始化方式
         * POST_INITIALIZED_EVENT:非同步初始化,初始化之後會觸發事件
         * NORMAL:非同步初始化,不會觸發事件
         * BUILD_INITIAL_CACHE:同步初始化,可以直接獲取當前節點所有子節點列表資料(非同步方式不可以)
         */
        childrenCache.start(StartMode.POST_INITIALIZED_EVENT);


        /**
         * BUILD_INITIAL_CACHE,同步監聽才可獲取子節點列表中的資料
         */
        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("當前資料節點的子節點資料列表:");
        for (ChildData cd : childDataList) {
            String childData = new String(cd.getData());
            System.out.println(childData);
        }

        /**
         * 非同步監聽當前節點,可轉換為監聽當前節點的父節點,對其子節點的增刪改查操作,可達到同樣的效果
         */
        childrenCache.getListenable().addListener((child, event) -> {
                    if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
                        System.out.println("子節點初始化ok...");
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                        String path = event.getData().getPath();
                        if (path.equals(ADD_PATH)) {
                            System.out.println("新增子節點:" + event.getData().getPath());
                            System.out.println("子節點資料:" + new String(event.getData().getData()));
                        } else if (path.equals("/super/directory/child2")) {
                            System.out.println("新增不正確...");
                        }
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                        System.out.println("刪除子節點:" + event.getData().getPath());
                    } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                        System.out.println("修改子節點路徑:" + event.getData().getPath());
                        System.out.println("修改子節點資料:" + new String(event.getData().getData()));
                    }
                }
        );
    }

四、總結:

(1)相比原生Zookeeper的API,Curator客戶端可以解決Watcher註冊一次就失效的問題;

(2)API簡單易用,提供了更多的解決方案且實現簡單,並且還支援遞迴的建立節點;

(3)Zookeeper可以更好的服務於Dubbo框架,ZK負責儲存了服務提供方和服務消費方的的URI(dubbo自定義的一種URI),服務消費方找到zookeeper,向zookeeper要到服務提供方的URI,然後就找到提供方,並呼叫提供方的服務;框架中要完成排程必須要有一個分散式的註冊中心,儲存所有服務的元資料。