java客戶端呼叫zookeeper
阿新 • • 發佈:2018-11-28
1、引入pom依賴
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.9</version> </dependency>
2、呼叫
/** * 1、startZK通過java程式,新建連結zk,類似jdbc的connection,open.session * 2、createZNode新建一個znode節點/atguigu並設定為hello0228 等同於create /atguigu hello1221 Ids.OPEN_ACL_UNSAFE * 3、getZnode獲得當前節點/atguigu的最新值 get /atguigu * 4、stopZK關閉連結 */ public class HelloZK { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(HelloZK.class); private static final String CONNECTSTRING = "192.168.154.162:2181"; private static final String PATH = "/atguigu"; //Node節點 private static final int SESSION_TIMEOUT = 20 * 1000; //sesseion超時時間 /** * startZK通過java程式,新建連結zk, */ public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent arg0) { // TODO Auto-generated method stub } }); } /** * createZNode新建一個znode節點 */ public void createZNode(ZooKeeper zk, String path, String data) throws KeeperException, InterruptedException { zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * getZnode獲得當前節點 */ public String getZnode(ZooKeeper zk, String path) throws KeeperException, InterruptedException { String result = ""; byte[] data = zk.getData(path, false, new Stat()); result = new String(data); return result; } /** * stopZK關閉連結 */ public void stopZK(ZooKeeper zk) throws InterruptedException { if (zk != null) zk.close(); } public static void main(String[] args) throws Exception { HelloZK helloZK = new HelloZK(); ZooKeeper zk = helloZK.startZK(); if (zk.exists(PATH, false) == null) { helloZK.createZNode(zk, PATH, "hello0228"); String znode = helloZK.getZnode(zk, PATH); logger.info("---------- znode =" + znode); } else { logger.info("----------this znode is ok"); } helloZK.stopZK(zk); } }
3、zookeeper通知機制(Watcher:觀察者模式)
/** * * @Description: *1 初始化ZK的多個操作 * 1.1 建立ZK的連結 * 1.2 建立/atguigu節點並賦值 * 1.3 獲得該節點的值 * * 2 watchmore * 2.1 獲得值之後設定一個觀察者watcher,如果/atguigu該節點的值發生了變化,要求通知Client端,一次性通知 * * 3 watchMore * 3.1 獲得值之後設定一個觀察者watcher,如果/atguigu該節點的值發生了變化,要求通知Client端,繼續觀察 * 3.2 又再次獲得新的值的同時再新設定一個觀察者,繼續觀察並獲得值 * 3.3 又再次獲得新的值的同時再新設定一個觀察者,繼續觀察並獲得值.。。。。。重複上述過程 * @author xialei */ public class WatchMore { private static final Logger logger = Logger.getLogger(WatchMore.class); //例項常量 private static final String CONNECTION_STRING = "192.168.154.162:2181"; private static final String PATH = "/atguigu"; private static final int SESSION_TIMEOUT = 20 * 1000; //例項變數 private ZooKeeper zk = null; private String oldValue = null; private String newValue = null; public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } public String getOldValue() { return oldValue; } public void setOldValue(String oldValue) { this.oldValue = oldValue; } public String getNewValue() { return newValue; } public void setNewValue(String newValue) { this.newValue = newValue; } /** * * * @Title: startZK * @Description: 獲得ZK的session連線物件例項 * @param @return * @param @throws IOException 引數 * @return ZooKeeper 返回型別 * @throws */ public ZooKeeper startZK() throws IOException { return new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { } }); } /** * * @Title: createZnode * @Description: 再給定的路徑下建立znode節點並賦值 * @param @param zk * @param @param path * @param @param data * @param @throws KeeperException * @param @throws InterruptedException 引數 * @return void 返回型別 * @throws */ public void createZnode(String path,String data) throws KeeperException, InterruptedException { zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } /** * * @Title: getZnode * @Description: 獲取我們對應節點的值 * @param @param zk * @param @param path * @param @return * @param @throws KeeperException * @param @throws InterruptedException 引數 * @return String 返回型別 * @throws */ public String getZnode(String path) throws KeeperException, InterruptedException { String result = ""; byte[] byteArray = zk.getData(path, new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }, new Stat()); result = new String(byteArray); oldValue = result; return result; } public boolean triggerValue(String path) throws KeeperException, InterruptedException { String result = ""; byte[] byteArray = zk.getData(path, new Watcher() { @Override public void process(WatchedEvent event) { try { triggerValue(path); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }, new Stat()); result = new String(byteArray); newValue = result; if(oldValue.equals(newValue)) { logger.info("-----------no changes---------0000 "); return false; }else { logger.info("-----------newValue: "+newValue+"\t oldValue: "+oldValue); oldValue = newValue; return true; } } public static void main(String[] args) throws Exception { WatchMore watchMore = new WatchMore(); watchMore.setZk(watchMore.startZK()); if(watchMore.getZk().exists(PATH, false) == null) { watchMore.createZnode(PATH, "AAA"); String result = watchMore.getZnode(PATH);//AAA if (logger.isInfoEnabled()) { logger.info("main(String[]) --------init String result=" + result); } }else { logger.info("this node has already exists!!!!"); } Thread.sleep(Long.MAX_VALUE); } }