zookeeper java api(1)
1 Zookeeper安裝以及啟動
這裡我已經進行了安裝,並且啟動了Zookeeper。埠是2182
2 Zookeeper config
tickTime=2000
initLimit=10
syncLimit=5
dataDir=D://zookiper/zookeeper/data
clientPort=2182
引數介紹
tickTime: 這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
dataDir: 顧名思義就是 Zookeeper 儲存資料的目錄,預設情況下,Zookeeper 將寫資料的日誌檔案也儲存在這個目錄裡。
clientPort:這個埠就是客戶端連線 Zookeeper 伺服器的埠,Zookeeper 會監聽這個埠,接受客戶端的訪問請求。
initLimit: 這個配置項是用來配置 Zookeeper 接受客戶端(這裡所說的客戶端不是使用者連線 Zookeeper 伺服器的客戶端,而是 Zookeeper 伺服器集 群中連線到 Leader 的 Follower 伺服器)初始化連線時最長能忍受多少個心跳時間間隔數。當已經超過 10 個心跳的時間(也就是tickTime)長度後 Zookeeper 伺服器還沒有收到客戶端的返回資訊,那麼表明這個客戶端連線失敗。總的時間長度就是 5*2000=10 秒
syncLimit:這個配置項標識 Leader 與 Follower 之間傳送訊息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是 2*2000=4 秒
3 連線zk伺服器
依賴的pom
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.3-beta</version> <type>pom</type> </dependency>
public static ZooKeeper zkClient(){
final String connectString="127.0.0.1:2182";
final int sessionTimeout=5000;
ZooKeeper zooKeeper=null;
try {
zooKeeper=new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(final WatchedEvent event) {
/** 判斷是否和伺服器之間取得了連線*/
if(event.getState()==Event.KeeperState.SyncConnected){
System.out.println("已經觸發了" + event.getType() + "事件!");
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
return zooKeeper;
}
在上述中,connectString,sessionTimeout表示連線伺服器的地址,以及客戶端連線伺服器端的session超時時間。watcher為監視器。zookeeper api和伺服器之間的連線是非同步的。當執行 ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, watcher) 這句程式碼之後,會立馬返回zkclient;當與伺服器建立好連線之後會 ,呼叫Watcher中的process方法進行處理。 process方法會接受一個WatchedEvent型別的引數,用於表明發生了什麼事件。
下屬程式碼塊可以作為連線zk的模板。
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //判斷是否已連線
if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
// 最初與zk伺服器建立好連線
} else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
// 子節點變化事件
}
// ...還可以繼續監聽其它事件型別
}
System.out.println(watchedEvent.getState());
}
WatchedEvent包含兩方面重要資訊:
1 與zk伺服器連線的狀態資訊
可以呼叫watchedEvent.getState()
方法獲取與zk伺服器連線的狀態資訊,狀態資訊取值主要包括SyncConnected、Disconnected、ConnectedReadOnly和AuthFailed等等。
2 發生的具體事件型別資訊watchedEvent.getState()
方法只是獲取與zk伺服器連線的狀態資訊,但在同一個連線狀態下,還會發生很多事件的型別。例如在zk中,我們可以watch一個節點的資料內容,當這個節點的資料被改變時,我們可以獲取到這個事件。類似的還有子節點列表變化事件等等。這就需要我們在SyncConnected同一種連線狀態下區分多個事件型別。可以通過watchedEvent.getType()
方法獲取具體的事件型別。
事件型別的取值包括None、NodeCreated、NodeDeleted、NodeDataChanged和NodeChildrenChanged。
4 建立節點
下面要介紹的每種api操作都可以分為兩種型別——同步和非同步。同步操作一般會有返回值,並且會丟擲相應的異常。非同步操作沒有返回值,也不會丟擲異常。此外非同步方法引數在同步方法引數的基礎上,會增加Callback和context兩個引數。如用同步方式建立一個節點的的程式碼如下:
- 建立同步節點
public static void createNodeSync() throws KeeperException, InterruptedException {
String path = "/poype_node";
ZooKeeper zooKeeper = zkClient();
String nodePath = zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(nodePath);
}
看ZkClient建立節點完畢:
關於上述的GUI工具,可以參見這篇文章:https://www.cnblogs.com/easyworld/p/8463910.html
同步下建立節點的方法create(*)介紹
public String create(final String path, byte data[],
List<ACL> acl,CreateMode createMode)
throws KeeperException, InterruptedException
引數介紹:
- path:建立節點的路徑
- data[] : 建立節點的資料值,引數型別是位元組陣列
- acl:節點的訪問許可權,我們這裡指定該節點可以被任何人訪問
createMode:create命令可以有-s和-e兩個引數,其中-s是順序節點,-e是臨時節點。這裡的CreateMode就是這兩個引數的組合。可選的值:
PERSISTENT |
永久節點 |
PERSISTENT_SEQUENTIAL |
永久有序節點 |
EPHEMERAL |
臨時節點 |
EPHEMERAL_SEQUENTIAL |
臨時有序節點 |
- 建立非同步節點
非同步模式方法沒有返回值,並且不會丟擲任何異常:除了同步create方法中的四個引數以外,非同步模式的create方法還增加了callback和context兩個引數。StringCallback介面中的processResult方法會在節點建立好之後被呼叫,它有四個引數。第一個是int型別的resultCode,作為建立節點的結果碼,當成功建立節點時,resultCode的值為0。第二個引數是建立節點的路徑。第三個引數是context,當一個StringCallback型別物件作為多個create方法的引數時,這個引數就很有用了。第四個引數是建立節點的名字,其實與path引數相同。
在我用其中的一種方式,怎麼也建立不了節點,就是如下的程式碼:
public static ZooKeeper zkClient(){
/** connectString ZK連線地址
* sessionTimeout 回話超時時間 單位ms
* ************************************************
* watcher 監視器
* 關於監視器:zookeeper api與伺服器建立連線的過程是非同步的。
* ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, watcher);
* 上面的呼叫會馬上從ZooKeeper建構函式返回,當與伺服器建立好連線之後會
* 呼叫Watcher中的process方法進行處理。
* process方法會接受一個WatchedEvent型別的引數,用於表明發生了什麼事件。*/
final String connectString="127.0.0.1:2182";
final int sessionTimeout=5000;
ZooKeeper zooKeeper=null;
try {
zooKeeper=new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(final WatchedEvent watchedEvent) {
/** 判斷是否和伺服器之間取得了連線*/
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
System.out.println("已經觸發了" + watchedEvent.getType() + "事件!");
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
return zooKeeper;
}
private static void createNodeAsync() {
String path = "/poype_node2";
ZooKeeper zooKeeper = zkClient();
zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new MyStringCallBack(), "create");
}
static class MyStringCallBack implements AsyncCallback.StringCallback{
@Override
public void processResult(final int rc, final String path, final Object ctx, final String name) {
System.out.println("非同步建立回撥結果:狀態:" + rc +";建立路徑:" +
path + ";傳遞資訊:" + ctx + ";實際節點名稱:" + name);
}
}
最後採用另外一種方式建立了節點:
- 同步方式獲取節點資料
public class CreateNode implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2182", 5000, new CreateNode());
countDownLatch.await();
/** 非同步建立臨時節點*/
zooKeeper.create("/poype_node2", "abc".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new MyStringCallBack(), "我是傳遞內容");
/** 驗證等待回撥結果使用,可根據實際情況自行調整*/
Thread.sleep(10000);
}
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
countDownLatch.countDown();
}
}
static class MyStringCallBack implements AsyncCallback.StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("非同步建立回撥結果:狀態:" + rc +";建立路徑:" +
path + ";傳遞資訊:" + ctx + ";實際節點名稱:" + name);
}
}
}
zooKeeper.getData方法的返回值就是節點中儲存的資料值,它有三個引數,第一個引數是節點的路徑,用於表示要獲取哪個節點中的資料。第三個引數stat用於儲存節點的狀態資訊,在呼叫getData方法前,會先構造一個空的Stat型別物件作為引數傳給getData方法,當getData方法呼叫返回後,節點的狀態資訊會被填充到stat物件中。
第二個引數是一個bool型別的watch,這個引數比較重要。當watch為true時,表示我們想要監控這個節點的資料變化。當節點的資料發生變化時,我們就可以拿到zk伺服器推送給我們的通知。在process方法中會有類似下面的程式碼:
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //與zk伺服器處於連線狀態
if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
createNodeAsync();
} else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
// 節點的子節點列表發生變化
} else if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
// 節點的資料內容發生變化
}
}
}
當節點的資料內容發生變化時,我們就會接收到NodeDataChanged這個事件。值得注意的是,zooKeeper設定的監聽只生效一次,如果在接收到NodeDataChanged事件後還想繼續對該節點的資料內容改變進行監聽,需要在事件處理邏輯中重新呼叫getData方法並將watch引數設定為true。
非同步獲取一個節點資料值的程式碼如下:
- 非同步方式獲取節點資料
ublic class CreateNode implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2182", 5000, new CreateNode());
countDownLatch.await();
zooKeeper.getData("/poype_node2",true,new MyStringCallBackData(),"非同步獲取節點的資料值");
Thread.sleep(10000);
}
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
countDownLatch.countDown();
}
}
static class MyStringCallBackData implements AsyncCallback.DataCallback{
@Override
public void processResult(final int rc, final String path, final Object ctx,
final byte[] data, final Stat stat) {
System.out.println(rc);
System.out.println(path);
System.out.println(ctx);
System.out.println("***********");
System.out.println(new String(data));
System.out.println(stat);
}
}
}
參考:https://segmentfault.com/a/1190000012262940
: https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/index.html