zookeeper使用(二):javaAPI基本操作和迴圈監聽器使用
阿新 • • 發佈:2019-02-08
本文介紹如何連線zookeeper叢集、對zookeeper資料的crud、以及迴圈監聽器的使用
引入pom依賴
引入的zookeeper版本依賴應該與安裝的zookeeper版本一致。注,需要註釋掉type標籤
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId >zookeeper</artifactId>
<version>3.4.9</version>
<!--<type>pom</type>-->
</dependency>
CRUD測試
針對crud命令做測試的幾個方法
package com.mym.test;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
public class ZookeeperTest {
ZooKeeper zkClient = null;
String createNodeName = "/apiTest10000000007"; //記錄建立的節點名稱
/**
* 建立連線
* @throws IOException
*/
@Before
public void connect() throws IOException {
if (zkClient == null){
zkClient = new ZooKeeper("192.168.31.201:2181,192.168.31.202:2181,192.168.31.203:2181", 1000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("success to connect zk cluster!");
}
});
}
}
/**
* 測試建立節點
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testCreate() throws KeeperException, InterruptedException {
createNodeName = zkClient.create("/apiTest1", "this is api test1 data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("建立節點的返回值是:"+createNodeName);
}
/**
* ls命令
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testLs() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren("/", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("獲得監聽事件,path:" + watchedEvent.getPath() + ";state" + watchedEvent.getState() + ";type:" + watchedEvent.getType());
}
});
for (int i = 0; i < children.size(); i++) {
System.out.println("ls / 資料:"+children.get(i));
}
}
/**
* set命令
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testSet() throws KeeperException, InterruptedException {
Stat stat = zkClient.setData(createNodeName, ("this is update data! "+System.currentTimeMillis()).getBytes(), -1);//-1表示讓系統維護version
System.out.println("set 返回資料::"+stat);
}
/**
* del命令
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testDel() throws KeeperException, InterruptedException {
zkClient.delete(createNodeName,-1);
System.out.println("success to del "+createNodeName+" Znode!");
}
/**
* get
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testGet() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData(createNodeName, false, null);
System.out.println("節點"+createNodeName+"的資料是:"+new String(data));
}
/**
* 測試watch
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testWatch() throws KeeperException, InterruptedException {
//先重置zkClient的watch物件(也可以在例項化zkClient時指定)
zkClient.register(watcher);
//進行監聽
byte[] data = zkClient.getData(createNodeName, true, null);
System.out.println("獲得節點"+createNodeName+"的資料是:"+new String(data));
//第一次進行修改,觸發watch
this.testSet();
//第二次進行修改,觸發watch
this.testSet();
}
/**
* 關閉連線
* @throws InterruptedException
*/
@After
public void close() throws InterruptedException {
zkClient.close();
}
/**定義watch物件*/
private Watcher watcher = new Watcher() {
public int watchCount = 0; //記錄監聽次數
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("獲得監聽事件,path:" + watchedEvent.getPath() + ";state:" + watchedEvent.getState() + ";type:" + watchedEvent.getType());
//迴圈重複監聽
try {
zkClient.exists(watchedEvent.getPath(), true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
watchCount++;
System.out.println("第 "+watchCount+" 次監聽到!");
}
};
}
測試結果如下
- 連線上叢集時
success to connect zk cluster!
- ls
類命令輸入:ls /
對應方法testLs()
ls / 資料:watchTest
ls / 資料:shunxu20000000001
ls / 資料:shunxu10000000000
ls / 資料:zookeeper
ls / 資料:a
- create
類命令輸入:create /apiTest1 'this is api test1 data'
對應方法testCreate
建立節點的返回值是:/apiTest10000000007
- get
類命令輸入:get /名稱
對應方法testGet()
節點/apiTest10000000007的資料是:this is api test1 data
- set
類命令輸入:set /名稱 資料
對應方法testSet()
set 返回資料::4294967336,4294967345,1526852099431,1526852869041,1,0,0,0,20,0,4294967336
注,這些返回的資料就是節點的所有屬性,可以進去看下Stat
類的定義:
public class Stat implements Record {
private long czxid;
private long mzxid;
private long ctime;
private long mtime;
private int version;
private int cversion;
private int aversion;
private long ephemeralOwner;
private int dataLength;
private int numChildren;
private long pzxid;
....
- delete
類命令輸入:delete /名稱
對應方法testDel()
success to del /apiTest10000000007 Znode!
測試Watch(迴圈監聽器)
邏輯就是對某個路徑在本次監聽完後主動用簡單(exist()方法)的呼叫再次註冊監聽
具體方法:
(注,這只是一個測試類,故直接把Watch匿名類物件直接例項化,不太友好,實際使用時應通過實現Watch介面,創造一個Watch實現來,從而獲得watcher物件來使用)
/**
* 測試watch
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testWatch() throws KeeperException, InterruptedException {
//先重置zkClient的watch物件(也可以在例項化zkClient時指定)
zkClient.register(watcher);
//進行監聽
byte[] data = zkClient.getData(createNodeName, true, null);
System.out.println("獲得節點"+createNodeName+"的資料是:"+new String(data));
//第一次進行修改,觸發watch
this.testSet();
//第二次進行修改,觸發watch
this.testSet();
}
/**定義watch物件*/
private Watcher watcher = new Watcher() {
public int watchCount = 0; //記錄監聽次數
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("獲得監聽事件,path:" + watchedEvent.getPath() + ";state:" + watchedEvent.getState() + ";type:" + watchedEvent.getType());
//迴圈重複監聽
try {
zkClient.exists(watchedEvent.getPath(), true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
watchCount++;
System.out.println("第 "+watchCount+" 次監聽到!");
}
};
測試結果:
獲得監聽事件,path:null;state:SyncConnected;type:None
獲得節點/apiTest10000000007的資料是:this is update data! 1532258475593
獲得監聽事件,path:/apiTest10000000007;state:SyncConnected;type:NodeDataChanged
set 返回資料::4294967352,4294967398,1526853419126,1526855521206,21,0,0,0,34,0,4294967352
第 1 次監聽到!
獲得監聽事件,path:/apiTest10000000007;state:SyncConnected;type:NodeDataChanged
set 返回資料::4294967352,4294967399,1526853419126,1526855521216,22,0,0,0,34,0,4294967352
第 2 次監聽到!
注:第一個 獲得監聽事件 是客戶端連線伺服器的自動監聽。