1. 程式人生 > >zookeeper客戶端操作及JAVA程式碼操作CURD

zookeeper客戶端操作及JAVA程式碼操作CURD

總結:zookeeper相當於一個遠端平臺,我們可以將資料放在上面,他是一個樹形結構,每一個節點稱為一個Znode

資料模型:每個節點或稱為目錄,都可以存放資料並且存放節點,有雙重功能

重點watch機制:

ZooKeeper 中,引入了 Watcher 機制來實現這種分散式的通知功能 。

總的來說可以概括 Watcher 為以下三個過程:客戶端向服務端註冊 Watcher

服務端事件發生觸發 Watcher、客戶端回撥 Watcher 得到觸發事件情況

 

首先在需要搭建一個zookeeper叢集,非常簡單,只需要修改tar包裡面的配置檔案的資料存放位置 ,另外在存放資料位置的資料夾下面新建一個myid使用者存放編號,在配置檔案中配置3個server.1=XXX:2181 即可

好了,步入正題,開始說明zookeer的相關操作

第一部分:linux搭建的zookeeper叢集中利用客戶端操作CURD

zookeeper安裝目錄中有一個客戶端/export/servers/zookeeper-3.4.5-cdh5.14.0/bin    zkCli.sh 

啟動zkCli.sh   直接輸入zkCli.sh即可  ,注意客戶端要執行,zookeeper伺服器必須執行才可以啊,否則報錯

輸入help可得到客戶端所有的命令

建立節點

create [-s] [-e] path data acl     s表示臨時  e表示序號遞增

查詢節點

ls path [watch]           後面可以跟watch監聽某個節點

修改節點

set path data [version]       

刪除節點

delete path [version]     

遞迴刪除

rmr path

獲取某個節點的詳細資訊

get path [watch]

 

第二部分: java程式碼控制zookeeper的增刪改查及watch機制監控

pom.xml檔案

引入3個依賴,curator-framework   curator-recipes   google-collections

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>hadoopdemo</artifactId>
        <groupId>cn.itcast.hadoop</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>demo01_zk</artifactId>


    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>5.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

    </dependencies>
</project>

 

新增操作

 /**
    * @Description:建立新節點
    * @:Param
    * @:Return
    */
    @Test
    public void createNode() throws Exception {
       //1 建立客戶端
        String conneciton="node01:2181,node02:2181,node03:2181";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 1);
        CuratorFramework client = CuratorFrameworkFactory.newClient(conneciton,3000,3000, retryPolicy);
        //2 開啟客戶端
        client.start();
        //3 建立節點(createMode可以建立多種形式的)
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/demo01/world");
        client.close();
    }

 

修改操作

 /**
    * @Description:修改節點資料
    * @:Param
    * @:Return
    */
    @Test
    public void updateNode() throws Exception {
        //建立物件
        String connetion = "node01:2181,node02:2181,node03:2181";
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(5000,1);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connetion, retryPolicy);
        client.start();
        //修改
        client.setData().forPath("/demo01/world","您好啊".getBytes());
        client.close();
    }

 

查詢操作

 /**
    * @Description:查詢
    * @:Param
    * @:Return
    */
    @Test
    public void selectNode() throws Exception {
         //建立物件
        String connection ="node01:2181,node02:2181,node03:2181";
        RetryPolicy retryPolicy= new ExponentialBackoffRetry(5000,1);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connection, retryPolicy);
        client.start();
        //查詢
        byte[] bytes = client.getData().forPath("/demo01/world");
        System.out.println(new java.lang.String(bytes));
        client.close();

    }

 

刪除操作
    /**
    * @Description:刪除
    * @:Param
    * @:Return
    */
     @Test
    public void deleteNode() throws Exception {
        //建立
         String connction="node01:2181,node02:2181,node03:2181";
         RetryPolicy retryPolicy=new RetryForever(5000);
         CuratorFramework client = CuratorFrameworkFactory.newClient(connction, retryPolicy);
         client.start();
         //刪除
         client.delete().forPath("/hello5");
         client.close();
     }

 

watch機制(重點,跟上面的套路是一樣的,獲取客戶端client再操作)

 /**
     * @Description: watch機制
     * @:Param
     * @:Return
     */
    @Test
     public void watchNode() throws Exception {
        String connction="node01:2181,node02:2181,node03:2181";
        RetryPolicy retryPolicy= new RetryForever(5000);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connction, retryPolicy);
        client.start();
        //watch
        //設定節點cache
        TreeCache treeCache = new TreeCache(client,"/");
        //設定監聽器
        treeCache.getListenable().addListener(new TreeCacheListener() {
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                ChildData data = event.getData();
                if(data!=null){
                    //switch 判斷各種可能性
                    switch (event.getType()){
                        case NODE_ADDED:
                        System.out.println("節點新增:路徑:"+data.getPath()+"資料:"+new String(data.getData()));
                        break;
                        case NODE_REMOVED:
                        System.out.println("節點刪除,路徑:"+data.getPath()+"資料:"+new String(data.getData()));
                        break;
                        case NODE_UPDATED:
                        System.out.println("節點修改,路徑:"+data.getPath()+"資料:"+new String(data.getData()));
                        break;

                    }

                }

            }
        });
        //開始監聽
        treeCache.start();
        //讓程式不結束,一直監聽
        Thread.sleep(900000000);
    }

這樣如果zookeeper發生增刪改操作,那麼watch機制就會及時知道,並且打印出結果