1. 程式人生 > >zookeeper客戶端curator簡易使用

zookeeper客戶端curator簡易使用

zookeeper客戶端curator簡易使用


寫在前面:目前Curator有2.x.x和3.x.x兩個系列的版本,支援不同版本的Zookeeper。其中Curator 2.x.x相容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只相容Zookeeper 3.5.x,並且提供了一些諸如動態重新配置、watch刪除等新特性。本文是基於docker容器建立zookeeper環境,讀者請自行選擇zookeeper環境。如若讀者想在centos安裝docker,請參考《centos中簡易安裝docker》https://blog.csdn.net/belonghuang157405/article/details/80774506

本文參考文章:https://blog.csdn.net/haoyuyang/article/details/53469269

完整程式碼地址:https://github.com/Blankwhiter/curator

第一步 搭建zookeeper環境

在centos中,拉取zookeeper映象,以及建立zookeeper容器。

docker pull zookeeper
docker run -d -v /home/soft/zookeeperhost/zookeeperDataDir:/data -v /home/soft/zookeeperhost/zookeeperDataLogDir:/datalog  -e
ZOO_MY_ID=1 -e ZOO_SERVERS='server.1=192.168.9.129:2888:3888'--net=host --privileged zookeeper

注:192.168.9.129讀者請自行改為本機ip,此文采用的單例,而非叢集,如需叢集 讀者請自行建立.192.168.9.129

第二步 curator依賴加入

在pox.xml中加入curator包:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId
>
curator-recipes</artifactId> <version>2.12.0</version> </dependency>

第三步 測試crud操作


import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;


/**
 * curator測試crud操作
 */
public class CuratorClientTest {

    /**
     * Zookeeper info 叢集用,隔開,如192.168.9.127:2181,192.168.9.128:2181,192.168.9.129:2181
     */
    private static final String ZK_ADDRESS = "192.168.9.129:2181";
    /**
     * 建立路徑
     */
    private static final String ZK_PATH_PARENT = "/new1";

    private static final String ZK_PATH = "/new1/mytest";


    public static void main(String[] args) throws Exception {

        CuratorFramework client = getClient();

        /*
        開始連線
         */
        client.start();

//        initNodeCache(client);

//        initPathChildrenCache(client);

        initTreeCache(client);

        createNode(client);

        /*
        普通查詢
         */
        byte[] bytes = client.getData().forPath(ZK_PATH);
        System.out.println(new String(bytes));
        /*
        包含狀態查詢
         */
        Stat stat = new Stat();
        byte[] bytes1 = client.getData().storingStatIn(stat).forPath(ZK_PATH);
        System.out.println(new String(bytes1));

//        updateNode(client);

//        deleteNode(client);

        Thread.sleep(15000);

        /*
        關閉連線
         */
        client.close();
    }

    /**
     * 刪除節點
     *
     * @param client
     * @throws Exception
     */
    public static void deleteNode(CuratorFramework client) throws Exception {
       /*
       刪除節點
        */
//        client.delete().forPath(ZK_PATH);
       /*
        刪除節點 並且遞迴刪除子節點
        */
//        client.delete().deletingChildrenIfNeeded().forPath(ZK_PATH);
        client.delete().guaranteed().forPath(ZK_PATH);
    }

    /**
     * 更新節點內容
     */
    public static void updateNode(CuratorFramework client) throws Exception {
       /*
        更新節點資訊 如果未傳入version引數,那麼更新當前最新版本
        */
       client.setData().forPath(ZK_PATH, "新內容3".getBytes());

        /*
         指定版本更新 版本不一直異常資訊:org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for
         */
//        client.setData().withVersion(aversion).forPath(ZK_PATH);
    }

    /**
     * 建立節點資訊以及內容
     *
     * @param client
     * @throws Exception
     */
    public static void createNode(CuratorFramework client) throws Exception {
    /*
    建立節點以及對應內容。無法遞迴建立節點
     */
        client.create().forPath(ZK_PATH + "/noCursion", "noCursion".getBytes());
//        client.create().forPath(ZK_PATH_PARENT,"noCursion".getBytes());

        /*
        遞迴建立節點以及對應內容
         */
        client.create().creatingParentsIfNeeded().forPath(ZK_PATH, "testdata".getBytes());

//        client.create().forPath(ZK_PATH_PARENT).
    }

    /**
     * 獲得客戶端連線
     *
     * @return
     */
    public static CuratorFramework getClient() {
    /*
    重試策略
     */
        ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
        /*
        CuratorFrameworkFactory工廠建立例項
         */
        return CuratorFrameworkFactory.newClient(ZK_ADDRESS, exponentialBackoffRetry);
    }

    /**
     * 初始化TreeCache的節點監聽
     *
     * @param client
     * @throws Exception
     */
    public static void initTreeCache(CuratorFramework client) throws Exception {
        TreeCache treeCache = new TreeCache(client, ZK_PATH_PARENT);
        treeCache.start();
        treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
            switch (treeCacheEvent.getType()) {
                case NODE_ADDED:
                    System.out.println("NODE_ADDED:路徑:" + treeCacheEvent.getData().getPath() + ",資料:" + new String(treeCacheEvent.getData().getData())
                            + ",狀態:" + treeCacheEvent.getData().getStat());
                    break;
                case NODE_UPDATED:
                    System.out.println("NODE_UPDATED:路徑:" + treeCacheEvent.getData().getPath() + ",資料:" + new String(treeCacheEvent.getData().getData())
                            + ",狀態:" + treeCacheEvent.getData().getStat());
                    break;
                case NODE_REMOVED:
                    System.out.println("NODE_REMOVED:路徑:" + treeCacheEvent.getData().getPath() + ",資料:" + new String(treeCacheEvent.getData().getData())
                            + ",狀態:" + treeCacheEvent.getData().getStat());
                    break;
                default:
                    break;
            }
        });
    }

    /**
     * 初始化節點監聽
     *
     * @param client
     * @throws Exception
     */
    public static void initNodeCache(CuratorFramework client) throws Exception {
    /*
     監聽節點的新增、修改操作。  最後一個引數表示是否進行壓縮
   */
        NodeCache nodeCache = new NodeCache(client, ZK_PATH_PARENT, false);
        nodeCache.start(true);
        /*
         會監聽父節點的建立和修改,刪除不會監聽
         */
        nodeCache.getListenable().addListener(() -> {
            System.out.println("nodeCache listen begin");
            System.out.println("data:" + nodeCache.getCurrentData().getData().toString());
            System.out.println("nodeCache listen end");
        });
    }

    /**
     * 初始化子節點監聽
     *
     * @param client
     * @throws Exception
     */
    public static void initPathChildrenCache(CuratorFramework client) throws Exception {
        /*
        監聽子節點的新增、修改、刪除操作。
        */
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, ZK_PATH_PARENT, true);
        /**
         * 如果不填寫這個引數,則無法監聽到子節點的資料更新
         如果引數為PathChildrenCache.StartMode.BUILD_INITIAL_CACHE,則會預先建立之前指定的/super節點
         如果引數為PathChildrenCache.StartMode.POST_INITIALIZED_EVENT,效果與BUILD_INITIAL_CACHE相同,只是不會預先建立/super節點
         引數為PathChildrenCache.StartMode.NORMAL時,與不填寫引數是同樣的效果,不會監聽子節點的資料更新操作
         */
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        pathChildrenCache.getListenable().addListener((curatorFramework, event) -> {
            switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED,型別:" + event.getType() + ",路徑:" + event.getData().getPath() + ",資料:" +
                            new String(event.getData().getData()) + ",狀態:" + event.getData().getStat());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED,型別:" + event.getType() + ",路徑:" + event.getData().getPath() + ",資料:" +
                            new String(event.getData().getData()) + ",狀態:" + event.getData().getStat());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED,型別:" + event.getType() + ",路徑:" + event.getData().getPath() + ",資料:" +
                            new String(event.getData().getData()) + ",狀態:" + event.getData().getStat());
                    break;
                default:
                    break;
            }
        });
    }