1. 程式人生 > >zookeeper(四)——Java的API、Curator、watcher

zookeeper(四)——Java的API、Curator、watcher

一、JavaAPI提供ZooKeeper新增、查詢、修改、刪除節點操作

pom檔案:

<dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.4.8</version>
</dependency>

程式碼如下:

public class ConnectionDemo {

    public static void main(String[] args) {
        try {
            final CountDownLatch countDownLatch=new CountDownLatch(1);
            ZooKeeper zooKeeper=
                    new ZooKeeper("192.168.1.200:2181," +
                            "192.168.1.201:2181,192.168.1.202:2181",
                            4000, new Watcher() {
                        @Override
                        public void process(WatchedEvent event) {
                            if(Event.KeeperState.SyncConnected==event.getState()){
                                //如果收到了服務端的響應事件,連線成功
                                countDownLatch.countDown();
                            }
                        }
                    });
            countDownLatch.await();
            System.out.println(zooKeeper.getState());//CONNECTED

            //新增節點
            zooKeeper.create("/zk-persis-cj","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            Thread.sleep(1000);
            Stat stat=new Stat();

            //得到當前節點的值
            byte[] bytes=zooKeeper.getData("/zk-persis-cj",null,stat);
            System.out.println(new String(bytes));

            //修改節點值
            zooKeeper.setData("/zk-persis-cj","1".getBytes(),stat.getVersion());

            //得到當前節點的值
            byte[] bytes1=zooKeeper.getData("/zk-persis-cj",null,stat);
            System.out.println(new String(bytes1));

            zooKeeper.delete("/zk-persis-cj",stat.getVersion());

            zooKeeper.close();

            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}

Watcher監控程式碼示例: 

public class WatcherDemo {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        final CountDownLatch countDownLatch=new CountDownLatch(1);
        final ZooKeeper zooKeeper=
                new ZooKeeper("192.168.1.200:2181," +
                        "192.168.1.201:2181,192.168.1.202:2181",
                        4000, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.out.println("預設事件: "+event.getType());
                        if(Event.KeeperState.SyncConnected==event.getState()){
                            //如果收到了服務端的響應事件,連線成功
                            countDownLatch.countDown();
                        }
                    }
                });
        countDownLatch.await();

        zooKeeper.create("/zk-persis-cj","1".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);


        //exists  getdata getchildren
        //通過exists繫結事件
        Stat stat=zooKeeper.exists("/zk-persis-cj", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event.getType()+"->"+event.getPath());
                try {
                    //再一次去繫結事件
                    zooKeeper.exists(event.getPath(),true);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //通過修改的事務型別操作來觸發監聽事件
        stat=zooKeeper.setData("/zk-persis-mic","2".getBytes(),stat.getVersion());

        Thread.sleep(1000);

        zooKeeper.delete("/zk-persis-mic",stat.getVersion());

        System.in.read();
    }
}

二、Curator的ZooKeeper新增、查詢、修改、刪除節點操作

pom依賴jar:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>

示例程式碼:

public class CuratorDemo {

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework=CuratorFrameworkFactory.
                builder().connectString("192.168.1.200:2181," +
                "192.168.1.202:2181,192.168.1.202:2181").
                sessionTimeoutMs(4000).retryPolicy(new
                ExponentialBackoffRetry(1000,3)).
                namespace("curator").build();

        curatorFramework.start();

        //結果: /curator/mic/node1
        //原生api中,必須是逐層建立,也就是父節點必須存在,子節點才能建立
        curatorFramework.create().creatingParentsIfNeeded().
                withMode(CreateMode.PERSISTENT).
                forPath("/mic/node1","1".getBytes());
        //刪除
        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/mic/node1");

        Stat stat=new Stat();
        curatorFramework.getData().storingStatIn(stat).forPath("/mic/node1");

        curatorFramework.setData().
                withVersion(stat.getVersion()).forPath("/mic/node1","xx".getBytes());



        curatorFramework.close();


    }
}

Watcher監控程式碼示例:

public class CuratorWatcherDemo {

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework=CuratorFrameworkFactory.
                builder().connectString("192.168.1.200:2181," +
                "192.168.1.201:2181,192.168.1.202:2181").
                sessionTimeoutMs(4000).retryPolicy(new
                ExponentialBackoffRetry(1000,3)).
                namespace("curator").build();

        curatorFramework.start();
        //當前節點的建立和刪除事件監聽
        addListenerWithNodeCache(curatorFramework,"/cj");
        //子節點的增加、修改、刪除的事件監聽
        addListenerWithPathChildCache(curatorFramework,"/cj");

        //綜合節點監聽事件
        addListenerWithTreeCache(curatorFramework,"/cj");
        System.in.read();
    }

    public static void addListenerWithTreeCache(CuratorFramework curatorFramework,String path) throws Exception {
        TreeCache treeCache=new TreeCache(curatorFramework,path);
        TreeCacheListener treeCacheListener=new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println(event.getType()+"->"+event.getData().getPath());
            }
        };

        treeCache.getListenable().addListener(treeCacheListener);
        treeCache.start();
    }

    /**
     * PathChildCache 監聽一個節點下子節點的建立、刪除、更新
     * NodeCache  監聽一個節點的更新和建立事件
     * TreeCache  綜合PatchChildCache和NodeCache的特性
     */

    public static void addListenerWithPathChildCache(CuratorFramework curatorFramework,String path) throws Exception {
        PathChildrenCache pathChildrenCache=new PathChildrenCache(curatorFramework,path,true);

        PathChildrenCacheListener pathChildrenCacheListener=new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("Receive Event:"+event.getType());
            }
        };

        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
        pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);

    }


    public static void addListenerWithNodeCache(CuratorFramework curatorFramework,String path) throws Exception {
        final NodeCache nodeCache=new NodeCache(curatorFramework,path,false);
        NodeCacheListener nodeCacheListener=new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("Receive Event:"+nodeCache.getCurrentData().getPath());
            }
        };
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();
    }
}