1. 程式人生 > >Zookeeper學習(三) 客戶端和原生API

Zookeeper學習(三) 客戶端和原生API

safe ima call proc string 過程 心跳 current catch

前言

在這篇博客裏我會主要總結下兩個部分的操作:

  1. 在安裝ZooKeeper的機器上利用ZKClient連接Zookeeper的集群,然後利用相應的命令做一些簡單的操作。相信很多沒有接觸過Zookeeper的同學對第一篇簡介裏的哪些ZNode等等一些概念其實不是那麽清楚,但是經過實際操作後會深入了解許多。
  2. 簡單介紹下做的一個小demo,介紹了一下對Zookeeper原生API的使用。我們實際項目中用的是Curator的接口,但是原生API是根本。順便也會提一下我們利用Zookeeper做的事-簡單的服務註冊和分布式鎖。

利用ZKClient操作Zookeeper

首先進入Zookeeper的bin目錄,然後用Zookeeper的client連接服務器端:

cmd: zkCli.sh -server localhost:2181

這裏我沒有配置成環境變量,所有在當前目錄下執行sh的命令會有一點點不一樣。

技術分享圖片

如果出現上面的結果表示已經連接上了,可以看到,目前我們還沒有設置任何watch。

然後我們可以進行一系列的簡單操作:

cmd: ls /

  1. 查看當前Zookeeper服務端中根節點的子節點:

技術分享圖片

可以看到只有Zookeeper默認的節點。

  1. 創建新節點:

    cmd:create /yourNode yourNodeData

技術分享圖片

  1. 查看節點內容:

    get /yourNode

技術分享圖片

  1. 設置節點內容:

    set /yourNode yourNodeData

技術分享圖片

  1. 刪除節點:

    delete /yourNode

技術分享圖片

好啦,差不多就這些操作了!賊簡單哈哈,但是雖然看起來簡單,但是根據不同的場景去使用Zookeeper的一些特性還是需要仔細考慮的。接下來總結下原生API的基本使用。

原生API的使用

Zookeeper類是使用Zookeeper API時的核心類。使用ZK服務時,程序中必須創建一個Zookeeper實例,而且一旦與服務端建立了連接,那麽Zookeeper服務端將會給這次會話分配一個Session id,而客戶端和服務端將會通過心跳來維持會話的連接。

關於Zookeeper類的方法可以查看:

3.4.11 Zookeeper API

主要有以下幾個方法:create, delte, exists, setData, getData, getChildren。其作用通過函數名都可以知道,其各種重載類型建議看一下API裏的介紹,很詳細。

之前為了在博客上能較為簡單的說明原生API的使用寫了一個簡單的demo。demo是Zookeeper API和Spring MVC結合使用的,這裏把主要的代碼貼一下:

package com.webex.Boostrap;

import org.apache.zookeeper.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Created by sigong on 2018/6/20.
 */
@Component
public class ZookeeperConfig  implements Watcher {

        @Value("${ID}")
        private String id;

        @Value("${ZOOkEEPER.SERVER_LIST}")
        private String serverList;

        @Value("${ZOOkEEPER.NODEPATH}")
        private String nodePath;

        private ZooKeeper zk;
        private CountDownLatch connectedSemaphore=new CountDownLatch(1);

        private List<String> nodeList = new ArrayList<>();

        public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
                        connectedSemaphore.countDown();
                        System.out.println("Call back: " + watchedEvent.getState());
                }
        }

        @PostConstruct
        private void connect() throws Exception{
                zk = new ZooKeeper(serverList, 5000, this);
                connectedSemaphore.await();

                System.out.println(zk.getState());

                if(!zk.getState().isConnected()){
                        return ;
                }

                if(zk.exists(nodePath,  false) == null){
                        zk.create(nodePath, "Kirk test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                        System.out.println("create path: " + nodePath);
                }

                zk.create(nodePath + "/" + id, "node test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println("create 2nd path: " + nodePath + "/" + id);

                nodeList = zk.getChildren(nodePath, true);
                System.out.println("children: " + nodeList);

                System.out.println(zk.getSessionId());

                System.out.println("Data: " + zk.getChildren(nodePath, true));

        }

        public void close(){
                try {
                        if(zk != null){
                                zk.close();
                        }
                        connectedSemaphore.countDown();
                }catch (InterruptedException e){
                        System.out.println("Close error: " + e.getMessage());
                }
        }

        public List<String> getNodeList() {
                return nodeList;
        }
}

ZookeeperConfig類實現了Watcher接口,這樣做的原因是給Zookeeper中的節點設置了watch之後,一旦相應的watch被觸發。那麽在設置watch時當做參數傳入的watch對象的process方法會被觸發。我這裏是為了簡單所以直接讓這個類實現了Watcher接口。如果不想這麽做也可以單獨做一個類實現Watcher接口,在設置watch的時候傳入就行了。

這裏我是想讓程序在開始的時候就鏈接Zookeeer所以給connect方法加了@PostConstruct註解。

在connect方法裏,通過zk = new ZooKeeper(serverList, 5000, this);實例化了zookeeper對象,客戶端會去連接serverList的zookeeper服務端,這裏的serverList是從配置文件中讀到的,配置為:ZOOKEEPER.SERVER_LIST=ip1:port1, ip2:port2, ip3:port3。其中ip和port為對應的zookeeper節點的ip和port。

if(zk.exists(nodePath,  false) == null){
                        zk.create(nodePath, "Kirk test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                        System.out.println("create path: " + nodePath);
                }

這裏首先去看路徑為nodePath的zookeeper節點是否在zookeeper中存在了,如果不存在就建立一個PERSISTENT類型的節點,這種節點不會隨著session的消失而消失。

zk.create(nodePath + "/" + id, "node test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println("create 2nd path: " + nodePath + "/" + id);

在建立的PERSISTENT的節點下,每個程序運行的時候會去建立一個EPHEMERAL_SEQUENTIAL類型的節點。為什麽要這樣做???因為在分布式的環境下,多臺機器工作時,每臺機器註冊EPHEMERAL_SEQUENTIAL節點,那麽zookeeper會從小到大為每個進程生成一個獨特的id。如果進程掛掉,我們可以重啟來重新為此進程註冊一個節點。如果多臺機器是主從結構的,主進程掛掉後,我們也可以利用Zookeeper來選舉出新的主進程。這裏我們在項目中就用到了Zookeeper的這項功能。

在我們的項目中,簡單的描述,有成為A和B的兩種服務,其中A是負責分發,一些邏輯的組裝,而B是負責執行的。我們的項目裏A是主從結構的,B是並行結構的,同時只有一個A節點在工作,而這個A節點會利用一些分發方法向所有現在可用的B節點發送任務,這樣我們需要做下面幾件事:

  1. A和B所有的節點在啟動時向Zookeeper註冊(不同節點下);
  2. 選出主A;
  3. A通過Zookeeper讀出所有可用的B;
  4. 主A掛掉後,其他的A通過選舉選出新的主A。

第一步,很簡單,我們可以建立類似/A,/B這樣的永久節點,並把A和B分別註冊EPHEMERAL_SEQUENTIAL節點;

第二步,選主A的過程其實就是利用Zookeeper實現分布式鎖的過程,我們采用的方法是把/A下SEQUENTIAL值最小的A節點作為主A;

第三步,通過getchildren就可以獲得目前所有可用的B;

最後,其實就是把第二步重新來一遍。

總結

Zookeeper的client操作和基本API的使用還是挺簡單的,但是自己還是要多去想怎麽更好的使用它並且要最好深入了解內部的一些機制和缺陷等。最近要開始看ZK的源碼了,希望之後順利~

Zookeeper學習(三) 客戶端和原生API