1. 程式人生 > >Zookeeper之開源客戶端ZkClient

Zookeeper之開源客戶端ZkClient

ZkClient是由Datameer的工程師開發的開源客戶端,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反覆註冊等功能。

ZKClient版本及原始碼

maven依賴

ZkClient目前有兩個不同artifactId的系列。
其中最早的0.1版本maven依賴如下:

<dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version
>
3.4.9</version> </dependency> <dependency> <groupId>com.github.sgroschupf</groupId> <artifactId>zkclient</artifactId> <version>0.1</version> </dependency>

另外一個系列為的maven依賴為:

<dependency>
     <groupId>org.apache.zookeeper</groupId
>
<artifactId>zookeeper</artifactId> <version>3.4.9</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>

其中第二個系列包含了從0.1~0.10的版本。檢視dubbo的原始碼,我們可以看到,dubbo採用了第一個系列的0.1版本。

原始碼

ZkClient使用

以下我們以第二個系列的0.10版本為例來說明ZKClient的API和使用

建立會話

ZkClient提供了7中建立會話的方法:

public ZkClient(String serverstring)

public ZkClient(String zkServers, int connectionTimeout)

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)

public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer)

public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)

public ZkClient(IZkConnection connection)

public ZkClient(IZkConnection connection, int connectionTimeout)

public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer)

public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)

上面方法的引數如果我們熟悉原生API的話,不難理解其引數,基本上引數名都是自描述的。值得留意的是ZkClient將ZK原生API中的非同步處理進行了同步化。

其中一個引數IZkConnection是一個介面的定義。檢視介面的方法不難發現它是對ZK原生介面最直接的包裝。在此介面下面有兩個實現方法ZkConnection和InMemoryConnection。在日常中直接使用ZkConnection方法就可以解決大部分的常見業務需求。

引數ZkSerializer同樣是一個介面,定義了byte陣列序列化和反序列化的兩個方法。如果不傳遞此引數,則預設使用org.I0Itec.zkclient.serialize.SerializableSerializer實現類進行序列化。某些情況下此序列化介面會出現問題,比如亂碼。此時,開發者可以直接實現ZkSerializer介面,重寫自己的序列化方法。比如使用Hessian或Kryo等。

通觀上面9個建立會話的構造方法,我們發現已經沒有Watcher的存在了。同時,ZkClient通過Listener來實現Wather註冊,從API級別來支援Watcher監聽的註冊。

package com.secbro.learn.zkclient;

import org.I0Itec.zkclient.ZkClient;

/**
 * Created by zhuzs on 2017/3/31.
 */
public class TestZkClient {
    public static void main(String[] args) {

        ZkClient zkClient = new ZkClient("127.0.0.1:2181",5000);
        System.out.println("ZK 成功建立連線!");
    }
}

建立節點

ZkClient提供了15個建立節點的方法:

public void createPersistent(String path)

public void createPersistent(String path, boolean createParents)

public void createPersistent(String path, boolean createParents, List<ACL> acl)

public void createPersistent(String path, Object data)

public void createPersistent(String path, Object data, List<ACL> acl)

public String createPersistentSequential(String path, Object data)

public String createPersistentSequential(String path, Object data, List<ACL> acl) 

public void createEphemeral(final String path)

public void createEphemeral(final String path, final List<ACL> acl)

public String create(final String path, Object data, final CreateMode mode)

public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode) 

public void createEphemeral(final String path, final Object data)

public void createEphemeral(final String path, final Object data, final List<ACL> acl)

public String createEphemeralSequential(final String path, final Object data)

public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl)

檢視原始碼可知,其實這些建立節點的方法都是對原生API的一層封裝而已,底層實現基本相同。值得留意的一點是,原生API的引數通過byte[]來傳遞節點內容,而ZkClient支援自定義序列化,因此可以傳輸Object物件。

createParents引數決定了是否遞迴建立父節點。true表示遞迴建立,false表示不使用遞迴建立。這也正是ZkClient幫開發人員省去了不少繁瑣的檢查和建立父節點的過程。

刪除節點

刪除節點提供了以下方法:

public boolean delete(final String path)

public boolean delete(final String path, final int version)

public boolean deleteRecursive(String path)

刪除API其實很簡單,重點說一下deleteRecursive介面,這個介面提供了遞迴刪除的功能。在原生API中,如果一個節點存在子節點,那麼它將無法直接刪除,必須一層層遍歷先刪除全部子節點,然後才能將目標節點刪除。

讀取節點

獲取節點列表

public List<String> getChildren(String path)

此介面返回子節點的相對路徑列表。比如節點路徑為/test/a1和/test/a2,那麼當path為/test時,返回的結果為[a1,a2]。

其中在原始API中,對節點註冊Watcher,當節點被刪除或其下面的子節點新增或刪除時,會通知客戶端。在ZkClient中,通過Listener監聽來實現,後續會將到具體的使用方法。

可以註冊的Listener為,介面IZkChildListener下面的方法來實現:

public void handleChildChange(String parentPath, List<String> currentChilds)

獲取節點內容

public <T extends Object> T readData(String path)

public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists)

public <T extends Object> T readData(String path, Stat stat)

通過方法返回引數的定義,就可以得知,返回的結果(節點的內容)已經被反序列化成物件了。

對本介面實現監聽的介面為IZkDataListener,分別提供了處理資料變化和刪除操作的監聽:

public void handleDataChange(String dataPath, Object data) throws Exception;

public void handleDataDeleted(String dataPath) throws Exception;

更新資料

更新操作可以通過以下介面來實現:

public void writeData(String path, Object object)

public void writeData(final String path, Object datat, final int expectedVersion)

public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion)

監測節點是否存在

此API比較簡單,呼叫以下方法即可:

protected boolean exists(final String path, final boolean watch)

註冊監聽

在ZkClient中客戶端可以通過註冊相關的事件監聽來實現對Zookeeper服務端時間的訂閱。其中ZkClient提供的監聽事件介面有以下幾種:

介面類 註冊監聽方法 解除監聽方法
IZkChildListener ZkClient的subscribeChildChanges方法 ZkClient的unsubscribeChildChanges方法
IZkDataListener ZkClient的subscribeDataChanges方法 ZkClient的subscribeChildChanges方法
IZkStateListener ZkClient的subscribeStateChanges方法 ZkClient的unsubscribeStateChanges方法

其中ZkClient還提供了一個unsubscribeAll方法,來解除所有監聽。

下面以IZkChildListener為例來舉例說明使用方法:

package com.secbro.learn.zkclient;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;

/**
 * Created by zhuzs on 2017/3/31.
 */
public class TestZkClient {
    public static void main(String[] args) throws InterruptedException {

        ZkClient zkClient = new ZkClient("127.0.0.1:2181",5000);
        System.out.println("ZK 成功建立連線!");

        String path = "/zk-test";
        // 註冊子節點變更監聽(此時path節點並不存在,但可以進行監聽註冊)
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("路徑" + parentPath +"下面的子節點變更。子節點為:" + currentChilds );
            }
        });

        // 遞迴建立子節點(此時父節點並不存在)
        zkClient.createPersistent("/zk-test/a1",true);
        Thread.sleep(5000);
        System.out.println(zkClient.getChildren(path));
    }
}

執行結果為:

ZK 成功建立連線!
路徑/zk-test下面的子節點變更。子節點為:[a1][a1]

上面展示了Listener的基本使用方法,其他方法的使用讀者可以自行嘗試。