1. 程式人生 > >Zookeeper 開源客戶端 ZkClient 版本 api介紹和示例

Zookeeper 開源客戶端 ZkClient 版本 api介紹和示例

ZkClient是由Datameer的工程師開發的開源客戶端,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反覆註冊等功能。

ZKClient版本及原始碼

maven依賴

ZkClient目前有兩個不同artifactId的系列。 
其中最早的0.1版本maven依賴如下:

<dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.4.9</version>
 </dependency>
 <dependency>
     <groupId>com.github.sgroschupf</groupId>
     <artifactId>zkclient</artifactId>
     <version>0.1</version>
 </dependency>

另外一個系列為的maven依賴為:

<dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.4.9</version>
 </dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

其中第二個系列包含了從0.1~0.10的版本。檢視dubbo的原始碼,我們可以看到,dubbo採用了第一個系列的0.1版本。

原始碼

ZkClient使用

以下我們以第二個系列的0.10版本為例來說明ZKClient的API和使用

建立會話

ZkClient提供了7中建立會話的方法:

public ZkClient(String serverstring)

public ZkClient(String zkServers, int connectionTimeout)

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer)

public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)

public ZkClient(IZkConnection connection)

public ZkClient(IZkConnection connection, int connectionTimeout)

public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer)

public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)

上面方法的引數如果我們熟悉原生API的話,不難理解其引數,基本上引數名都是自描述的。值得留意的是ZkClient將ZK原生API中的非同步處理進行了同步化。

其中一個引數IZkConnection是一個介面的定義。檢視介面的方法不難發現它是對ZK原生介面最直接的包裝。在此介面下面有兩個實現方法ZkConnection和InMemoryConnection。在日常中直接使用ZkConnection方法就可以解決大部分的常見業務需求。

引數ZkSerializer同樣是一個介面,定義了byte陣列序列化和反序列化的兩個方法。如果不傳遞此引數,則預設使用org.I0Itec.zkclient.serialize.SerializableSerializer實現類進行序列化。某些情況下此序列化介面會出現問題,比如亂碼。此時,開發者可以直接實現ZkSerializer介面,重寫自己的序列化方法。比如使用Hessian或Kryo等。

舉例:

String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";  
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 10000);

建立節點

ZkClient提供了15個建立節點的方法:

public void createPersistent(String path)

public void createPersistent(String path, boolean createParents)

public void createPersistent(String path, boolean createParents, List<ACL> acl)

public void createPersistent(String path, Object data)

public void createPersistent(String path, Object data, List<ACL> acl)

public String createPersistentSequential(String path, Object data)

public String createPersistentSequential(String path, Object data, List<ACL> acl) 

public void createEphemeral(final String path)

public void createEphemeral(final String path, final List<ACL> acl)

public String create(final String path, Object data, final CreateMode mode)

public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode) 

public void create(final String path, Object data, final CreateMode mode, final AsyncCallback.StringCallback callback, final Object context)

public void createEphemeral(final String path, final Object data)

public void createEphemeral(final String path, final Object data, final List<ACL> acl)

public String createEphemeralSequential(final String path, final Object data)

public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl)

檢視原始碼可知,其實這些建立節點的方法都是對原生API的一層封裝而已,底層實現基本相同。值得留意的一點是,原生API的引數通過byte[]來傳遞節點內容,而ZkClient支援自定義序列化,因此可以傳輸Object物件。

 該API方法的引數說明如下表所示。

引數名 說明
path 指定資料節點的節點路徑,即API呼叫的目的是建立該節點
data 節點的初始資料內容,可以傳入null
mode 節點型別,是一個列舉型別,通常有4種可選的節點型別
acl 節點的ACL策略
callback 註冊一個非同步回撥函式
context 用於傳遞一個物件,可以在執行回撥函式的時候使用
createParents 指定是否建立父節點

示例:

zkc.createEphemeral("/temp");
// 可以支援遞迴的建立,但是不能遞迴賦值
zkc.createPersistent("/super/c1", true);

刪除節點

刪除節點提供了以下方法:

public boolean delete(final String path)

public boolean delete(final String path, final int version)

public boolean deleteRecursive(String path)

刪除API其實很簡單,重點說一下deleteRecursive介面,這個介面提供了遞迴刪除的功能。在原生API中,如果一個節點存在子節點,那麼它將無法直接刪除,必須一層層遍歷先刪除全部子節點,然後才能將目標節點刪除。

示例:

刪除 /temp節點
zkc.delete("/temp");
//遞迴刪除/super
zkc.deleteRecursive("/super");

讀取節點

獲取節點列表

public List<String> getChildren(String path)

此介面返回子節點的相對路徑列表。比如節點路徑為/test/a1和/test/a2,那麼當path為/test時,返回的結果為[a1,a2]。

獲取節點內容

public <T extends Object> T readData(String path)

public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists)

public <T extends Object> T readData(String path, Stat stat)

通過方法返回引數的定義,就可以得知,返回的結果(節點的內容)已經被反序列化成物件了。

獲取節點並返回內容示例:

List<String> list = zkc.getChildren("/super");
for(String p : list){
    System.out.println(p);
    String rp = "/super/" + p;
    String data = zkc.readData(rp);
    System.out.println("節點為:" + rp + ",內容為: " + data);
}

更新資料

更新操作可以通過以下介面來實現:

public void writeData(String path, Object object)

public void writeData(final String path, Object datat, final int expectedVersion)

public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion)

示例:zkc.writeData("/super/c1", "新內容");

監測節點是否存在

此API比較簡單,呼叫以下方法即可:

protected boolean exists(final String path, final boolean watch)

綜合以上api 寫一個小栗子:

程式碼在 bjsxt.zkclient.base

public class ZkClientBase {

    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181";
    /** session超時時間 */
    static final int SESSION_OUTTIME = 10000;//ms


    public static void main(String[] args) throws Exception {
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
        //1. create and delete方法

        zkc.createEphemeral("/temp");
        zkc.createPersistent("/super/c1", true);
        Thread.sleep(10000);
        zkc.delete("/temp");
        zkc.deleteRecursive("/super");

        //2. 設定path和data 並且讀取子節點和每個節點的內容
        zkc.createPersistent("/super", "1234");
        zkc.createPersistent("/super/c1", "c1內容");
        zkc.createPersistent("/super/c2", "c2內容");
        List<String> list = zkc.getChildren("/super");
        for(String p : list){
            System.out.println(p);
            String rp = "/super/" + p;
            String data = zkc.readData(rp);
            System.out.println("節點為:" + rp + ",內容為: " + data);
        }

        //3. 更新和判斷節點是否存在
        zkc.writeData("/super/c1", "新內容");
        System.out.println(zkc.readData("/super/c1").toString());
        System.out.println(zkc.exists("/super/c1"));

//      4.遞迴刪除/super內容
        zkc.deleteRecursive("/super");
    }
}

輸出

c1
節點為:/super/c1,內容為: c1內容
c2
節點為:/super/c2,內容為: c2內容
新內容
true

註冊監聽

我們發現,上述ZKClient裡面並沒有類似的watcher、watch引數,這也就是說我們開發人員無需關心反覆註冊watcher的問題, 在ZkClient中可以通過註冊相關的事件監聽來實現對Zookeeper服務端時間的訂閱。其中ZkClient提供的監聽事件介面有以下幾種:

介面類 註冊監聽方法 解除監聽方法
IZkChildListener ZkClient的subscribeChildChanges方法 ZkClient的unsubscribeChildChanges方法
IZkDataListener ZkClient的subscribeDataChanges方法 ZkClient的subscribeDataChanges方法
IZkStateListener ZkClient的subscribeStateChanges方法 ZkClient的unsubscribeStateChanges方法

其中ZkClient還提供了一個unsubscribeAll方法,來解除所有監聽。

示例:

1 subscribeChildChanges方法 訂閱子節點變化

引數1: path路徑 
引數2:實現了IZKChildListener介面的類(如:例項化IZKClientListener類) 只需要重寫其handleChildChanges(String parentPath, List currentChild)方法。其中 
- parentPath 為監聽節點全路徑 
- currentChilds為新的子節點列表 
IZKChildListener事件說明針對於下面三個事件觸發: 
- 新增子節點 
- 減少子節點 
- 刪除節點 
注意: 不監聽節點內容的變化 
舉例

public class ZkClientWatcher1 {

    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181";
    /** session超時時間 */
    static final int SESSION_OUTTIME = 10000;//ms


    public static void main(String[] args) throws Exception {
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

        //對父節點新增監聽子節點變化。
        zkc.subscribeChildChanges("/super", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("parentPath: " + parentPath);
                System.out.println("currentChilds: " + currentChilds);
            }
        });

        Thread.sleep(3000);

        zkc.createPersistent("/super");
        Thread.sleep(1000);

        zkc.createPersistent("/super" + "/" + "c1", "c1內容");
        Thread.sleep(1000);

        zkc.createPersistent("/super" + "/" + "c2", "c2內容");
        Thread.sleep(1000);     

        zkc.delete("/super/c2");
        Thread.sleep(1000); 

        zkc.deleteRecursive("/super");
        Thread.sleep(Integer.MAX_VALUE);

    }
}

輸出

parentPath: /super
currentChilds: []
parentPath: /super
currentChilds: [c1]
parentPath: /super
currentChilds: [c1, c2]
parentPath: /super
currentChilds: [c1]
parentPath: /super
currentChilds: null
parentPath: /super
currentChilds: null
  • 2 subscribeDataChanges 訂閱內容變化

和前面的subscribeChildChanges類似 
- 引數1 路徑 
- 引數2 IZkDataListener物件,重寫handleDataDeleted(String path) 方法,可以得到刪除節點的path 重寫handleDataChange(String path, Object data)可以得到變更的節點和變更的內容

舉例

public class ZkClientWatcher2 {

    /** zookeeper地址 */
    static final String CONNECT_ADDR = "192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181";
    /** session超時時間 */
    static final int SESSION_OUTTIME = 10000;//ms


    public static void main(String[] args) throws Exception {
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

        zkc.createPersistent("/super", "1234");

        //對父節點新增監聽子節點變化。
        zkc.subscribeDataChanges("/super", new IZkDataListener() {
            @Override
            public void handleDataDeleted(String path) throws Exception {
                System.out.println("刪除的節點為:" + path);
            }

            @Override
            public void handleDataChange(String path, Object data) throws Exception {
                System.out.println("變更的節點為:" + path + ", 變更內容為:" + data);
            }
        });

        Thread.sleep(3000);
        zkc.writeData("/super", "456", -1);
        Thread.sleep(1000);

        zkc.delete("/super");
        Thread.sleep(Integer.MAX_VALUE);
    }
}
  • 輸出

變更的節點為:/super, 變更內容為:456
刪除的節點為:/super

3 zookeeper服務狀態監聽器:


public class ZkStateWatcher {
    static final String CONNECT_ADDR = "172.21.121.53:2181,172.21.121.54:2181,172.21.121.55:2181";
    static final int CONNECTION_OUTTIME = 5000;
    public static void main(String[] args) throws InterruptedException{
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR),CONNECTION_OUTTIME);
        zkc.subscribeStateChanges(new IZkStateListener() {
            
            @Override
            public void handleStateChanged(KeeperState state) throws Exception {
                if(state==KeeperState.SyncConnected){
                    //當我重新啟動後start,監聽觸發
                    System.out.println("連線成功");
                }else if(state==KeeperState.Disconnected){
                    System.out.println("連線斷開");//當我在服務端將zk服務stop時,監聽觸發
                }else
                    System.out.println("其他狀態"+state);
                
            }
            
            @Override
            public void handleNewSession() throws Exception {
                System.out.println("---->重建session");
            }
        });
//        zkc.close();
        
        Thread.sleep(Integer.MAX_VALUE);
    }
        
}
當我只連線zk服務叢集中單獨的一個zk服務時:

./zkServer.sh stop 連線斷開

./zkServer.sh start 連線成功

./zkServer.sh restart 連線斷開 連線成功

./zkServer.sh stop 連線斷開

等待幾分鐘

./zkServer.sh satrt 其他狀態Expired ---->重建session 連線成功

其實無需等待幾分鐘,在zoo.cfg配置檔案中有2個引數來管理session的超時時間:minSessionTimeout和maxSessionTimeout

2個引數的預設值都為-1,這個值在日誌中可以看到:


 

預設情況下,maxSessionTimeout = ticketTime*20; minSessionTimeout = ticketTime*2;

相關推薦

Zookeeper 開源客戶 ZkClient 版本 api介紹示例

ZkClient是由Datameer的工程師開發的開源客戶端,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反覆註冊等功能。 ZKClient版本及原始碼 maven依賴 ZkClient目前有兩個不同artifactId的系列。  其中最早

Zookeeper學習六】——開源客戶ZKClientCurator介紹與應用

前言 在真正的專案中通常使用的是zkclient和curator,而不是原生的zookeeper客戶端,因為zookeeper原生的客戶端存在一定的侷限性,本篇小編主要講解一下這兩種zookeeper客戶端的使用! 內容 1.1zk原生api不足之

zookeeper開源客戶Curator介紹(六)

上一篇文章介紹了zookeeper原生API的使用,使用過原生API不得不說,有很多的問題,比如:不能遞迴建立和刪除節點、Watcher只能使用一次、還有很多可以解決分散式應用問題的api(比如分散式鎖,leader選舉等),但由於ZooKeeper提供的原始

Zookeeper開源客戶ZkClient

ZkClient是由Datameer的工程師開發的開源客戶端,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反覆註冊等功能。 ZKClient版本及原始碼 maven依賴 ZkClient目前有兩個不同artifactI

zookeeper開源客戶Curator典型應用場景之-服務註冊與發現(十一)

隨著業務增加,以前簡單的系統已經變得越來越複雜,單純的提升伺服器效能也不是辦法,而且程式碼也是越來越龐大,維護也變得越來越困難,這一切都催生了新的架構設計風格 – 微服務架構的出現。 微服務給我們帶來了很多好處,例如:獨立可擴充套件、易維護。但是隨著應用的分解

zookeeper開源客戶Curator典型應用場景之-Barrier屏障(十三)

什麼是Barrier Barrier是這樣的:Barrier是一個同步點,每一個程序到達此點都要等待,直到某一個條件滿足,然後所有的節點繼續進行。 比如:賽跑大家都知道,所有比賽人員都會在起跑線外等待,直到教練員的槍響之後,所有參賽者立刻開始賽跑。 JDK的併

zookeeper開源客戶Curator典型應用場景之-分散式計數器(十四)

之前我們瞭解了基於Corator的分散式鎖之後,我們就很容易基於其實現一個分散式計數器,顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個叢集共享的計數器。 只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證

Zookeeper開源客戶框架Curator簡介

curator簡介 Netflix curator 是Netflix公司開源的一個Zookeeper client library,用於簡化zookeeper客戶端程式設計,包含一下幾個模組: curator-client - zookeeper client封裝,用於

Zookeeper開源客戶Curator之基本功能講解

簡介 Curator是Netflix公司開源的一套Zookeeper客戶端框架。瞭解過Zookeeper原生API都會清楚其複雜度。Curator幫助我們在其基礎上進行封裝、實現一些開發細節,包括接連重連、反覆註冊Watcher和NodeExistsExcept

關於zookeeper第三方客戶zkclient的使用說明

ZkClient        在使用ZooKeeper的Java客戶端時,經常需要處理幾個問題:重複註冊watcher、session失效重連、異常處理。        要解決上述的幾個問題,可以自己解決,也可以採用第三方的java客戶端來完成。這裡就介

【hadoop zookeeperZookeeper開源客戶框架Curator簡介

Curator是Netflix開源的一套ZooKeeper客戶端框架. Netflix在使用ZooKeeper的過程中發現ZooKeeper自帶的客戶端太底層, 應用方在使用的時候需要自己處理很多事情, 於是在它的基礎上包裝了一下, 提供了一套更好用的客戶端框架. Netf

zookeeper開源客戶Curator典型應用場景之-訊息佇列(十二)

Curator框架也有分散式佇列實現。 利用ZK的PERSISTENT SEQUENTIAL(持久順序)節點,可以保證放入到佇列中的專案是按照順序排隊的。並且宕機重啟並不丟失訊息, 如果單一的消費者從佇列中取資料, 那麼它是先入先出的,這也是佇列的特點。 如果

zookeeper開源客戶Curator典型應用場景之-Master選舉(十)

在生產環境中,一般要保證服務的高可用,有時候只需要選出一臺機器來執行,其餘機器處於備用狀態,比如,在分散式系統中很常見的一個問題就是定時任務的執行。如果多臺機器同時執行相同的定時任務,業務複雜則可能出現災難性的後果。我使用的是噹噹網的elastic-job分散

ZooKeeper(四)-- 第三方客戶 ZkClient的使用

實現 kafka int java pid () config obj 9.1 前言   zkClient主要做了兩件事情:     一件是在session loss和session expire時自動創建新的ZooKeeper實例進行重連。     另一件是將一次性wat

Zookeeper分散式及客戶Curator的API簡單使用

最近公司專案中使用了分散式Zookeeper及Dubbo,為了弄清楚這些框架在專案中的使用,在我業餘時間中學習了一些Zookeeper的簡單用法,分享出來,若有不足之處,望大家給與建議...... 一、什麼是分散式系統? 我的理解:將原有的系統拆分為多個子系統組成一個龐大的系統,這個龐大

zookeeper使用(三)--開源客戶

一、前言   上一篇部落格已經介紹瞭如何使用Zookeeper提供的原生態Java API進行操作,本篇博文主要講解如何通過開源客戶端來進行操作。 二、ZkClient   ZkClient是在Zookeeper原聲API介面之上進行了包裝,是一個更易用的Zookeeper客戶端,其內部

zookeeper簡單客戶API

public class ZKclient{ public static void main(String[] args){ private String connectString="ip1:2181,ip2:2181,ip3:2181"; private int sessio

Zookeeper(七)開源客戶

經過上面兩節部落格的介紹,朋友們應該會開始簡單地使用ZooKeeper了。 在這一偏文章中,我們將圍繞ZkClient和Curator這兩個開源的ZooKeeper客戶端產品,再來進一步看看如何更好地使用ZooKeeper。 ZkClient ZkClient是Github

ZookeeperZookeeper底層客戶架構實現原理(轉載)

一次 描述 綁定 機制 一個 ini fin 源碼 receive Zookeeper的Client直接與用戶打交道,是我們使用Zookeeper的interface。了解ZK Client的結構和工作原理有利於我們合理的使用ZK,並能在使用中更早的發現問題。本文將在研究源

2017python windows 客戶最新版本3.6.2安裝教程

python第一章 下載windows版本的python登錄網址 https://www.python.org/getit/ 點擊下載Download Python 3.6.3第二章 安裝python-3.6.2.exe根據下圖提示操作即可。第三章 驗證python客戶端是否安裝成功主