1. 程式人生 > >ZooKeeper(四)-- 第三方客戶端 ZkClient的使用

ZooKeeper(四)-- 第三方客戶端 ZkClient的使用

實現 kafka int java pid () config obj 9.1

前言

  zkClient主要做了兩件事情:

    一件是在session loss和session expire時自動創建新的ZooKeeper實例進行重連。

    另一件是將一次性watcher包裝為持久watcher。後者的具體做法是簡單的在watcher回調中,重新讀取數據的同時再註冊相同的watcher實例。

zkClient目前已經運用到了很多項目中,知名的有Dubbo、Kafka、Helix。

  zkClient jar包下載,或者直接添加maven依賴: http://mvnrepository.com/artifact/com.101tec/zkclient

maven依賴:

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

正文

1.測試類  

技術分享圖片
package com.xbq.demo;
import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.IZkDataListener;

/**
 * @ClassName: ZkClientDemo 
 * @Description: TODO(zkClient 測試) 
 * @author xbq 
 * @date 2017-3-26 上午11:49:39 
 */
public class ZkClientDemo {

    // 此demo使用的集群,所以有多個ip和端口
    private static String CONNECT_SERVER = "192.168.99.138:2181,192.168.99.138:2182,192.168.99.138:2183";
    private static int SESSION_TIMEOUT = 3000;
    private static int CONNECTION_TIMEOUT = 3000;
    private static ZkClient zkClient ;
    
    static {
        zkClient = new ZkClient(CONNECT_SERVER, SESSION_TIMEOUT,CONNECTION_TIMEOUT,new MyZkSerializer());
    }
    
    public static void main(String[] args) {
        
        add(zkClient);
//        update(zkClient);
//        delete(zkClient);
        
//        addDiGui(zkClient);
//        deleteDiGui(zkClient);
        
//        subscribe(zkClient);
    }
    
    /**
     * @Title: add 
     * @Description: TODO(增加一個指定節點) 
     * @param @param zkClient    設定文件 
     * @return void    返回類型 
     * @throws
     */
    public static void add(ZkClient zkClient){
        // 如果不存在節點,就新建一個節點
        if(!zkClient.exists("/config")){
            zkClient.createPersistent("/config","javaCoder");
        }
        // 查詢一下,看是否增加成功
        String value = zkClient.readData("/config");
        System.out.println("value===" + value);
    }
    
    /**
     * @Title: addSequential 
     * @Description: TODO(遞歸創建節點) 
     * @param @param zkClient    設定文件 
     * @return void    返回類型 
     * @throws
     */
    public static void addDiGui(ZkClient zkClient){
        // 遞歸創建節點
        zkClient.createPersistent("/xbq/java/coder", true); 
        if(zkClient.exists("/xbq/java/coder")){
            System.out.println("增加成功!");
        }else {
            System.out.println("增加失敗!");
        }
    }
    
    /**
     * @Title: delete 
     * @Description: TODO(刪除指定節點) 
     * @param @param zkClient    設定文件 
     * @return void    返回類型 
     * @throws
     */
    public static void delete(ZkClient zkClient){
        // 存在節點才進行刪除
        if(zkClient.exists("/config")){
            boolean flag = zkClient.delete("/config");
            System.out.println("刪除" + (flag == true ? "成功!" : "失敗!"));
        }
    }
    
    /**
     * @Title: deleteDiGui 
     * @Description: TODO(遞歸刪除) 
     * @param @param zkClient    設定文件 
     * @return void    返回類型 
     * @throws
     */
    public static void deleteDiGui(ZkClient zkClient){
        // 存在節點才進行刪除
        if(zkClient.exists("/xbq")){
            // 遞歸刪除的時候 只傳入 父節點就可以,如果傳入 全部的節點,雖然返回的是true,但是依然是沒有刪除的,
            // 因為zkClient將異常封裝好了,進入catch的時候,會返回true,這是一個坑
            boolean flag = zkClient.deleteRecursive("/xbq");  
            System.out.println("刪除" + (flag == true ? "成功!" : "失敗!"));
        }
    }
    
    /**
     * @Title: update 
     * @Description: TODO(修改節點的值) 
     * @param @param zkClient    設定文件 
     * @return void    返回類型 
     * @throws
     */
    public static void update(ZkClient zkClient){
        if(zkClient.exists("/config")){
            zkClient.writeData("/config", "testUpdate");
            // 查詢一下,看是否修改成功
            String value = zkClient.readData("/config");
            System.out.println("value===" + value);
        }
    }
    
    /**
      * @Title: subscribe 
     * @Description: TODO(事件訂閱, 可用於配置管理) 
     * 先訂閱,再 操作增刪改。(可多個 客戶端訂閱)
     * @param @param zkClient    設定文件 
     * @return void    返回類型 
     * @throws
     */
    public static void subscribe(ZkClient zkClient){
        zkClient.subscribeDataChanges("/config/userName", new IZkDataListener() {
            @Override
            public void handleDataDeleted(String arg0) throws Exception {
                System.out.println("觸發了刪除事件:" + arg0);
            }
            
            @Override
            public void handleDataChange(String arg0, Object arg1) throws Exception {
                System.out.println("觸發了改變事件:" + arg0 + "-->" + arg1);
            }
        });
        
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
技術分享圖片

2.自定義序列化類

技術分享圖片
package com.xbq.demo;

import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;

import java.io.UnsupportedEncodingException;

/**
 * @ClassName: MyZkSerializer 
 * @Description: TODO(實現序列化接口,轉為UTF-8編碼) 
 * @author xbq 
 * @date 2017-3-26 上午11:56:22 
 */
public class MyZkSerializer implements ZkSerializer{
    @Override
    public byte[] serialize(Object data) throws ZkMarshallingError {
        try {
            return String.valueOf(data).getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            return new String(bytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }
}
技術分享圖片

ZooKeeper(四)-- 第三方客戶端 ZkClient的使用