3分鐘理解zookeeper的watcher機制
先上程式碼:
package jeff.zookeeper.watcher; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * Zookeeper Wathcher * 本類就是一個Watcher類(實現了org.apache.zookeeper.Watcher類) * @authorjeff */ public class ZooKeeperWatcher implements Watcher { /** 定義原子變數 */ AtomicInteger seq = new AtomicInteger(); /** 定義session失效時間 */ private static final int SESSION_TIMEOUT = 10000; /** zookeeper伺服器地址 */ private static final String CONNECTION_ADDR = "192.168.98.98:2181,192.168.98.99:2181,192.168.98.100:2181"; /** zk父路徑設定 */ private static final String PARENT_PATH = "/p"; /** zk子路徑設定 */ private static final String CHILDREN_PATH = "/p/c1"; /** 進入標識 */ private static final String LOG_PREFIX_OF_MAIN = "【Main】"; /** zk變數 */ private ZooKeeper zk = null; /** 訊號量設定,用於等待zookeeper連線建立之後 通知阻塞程式繼續向下執行 */ private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 建立ZK連線 * @param connectAddr ZK伺服器地址列表 * @param sessionTimeout Session超時時間 */ public void createConnection(String connectAddr, int sessionTimeout) { this.releaseConnection(); try { zk = new ZooKeeper(connectAddr, sessionTimeout, this); System.out.println(LOG_PREFIX_OF_MAIN + "開始連線ZK伺服器"); connectedSemaphore.await(); } catch (Exception e) { e.printStackTrace(); } } /** * 關閉ZK連線 */ public void releaseConnection() { if (this.zk != null) { try { this.zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 建立節點 * @param path 節點路徑 * @param data 資料內容 * @return */ public boolean createPath(String path, String data,boolean watch) { try { //設定監控(由於zookeeper的監控都是一次性的所以 每次必須設定監控) this.zk.exists(path, watch); System.out.println(LOG_PREFIX_OF_MAIN + "節點建立成功, Path: " + this.zk.create( /**路徑*/ path, /**資料*/ data.getBytes(), /**所有可見*/ Ids.OPEN_ACL_UNSAFE, /**永久儲存*/ CreateMode.PERSISTENT ) + ", content: " + data); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 讀取指定節點資料內容 * @param path 節點路徑 * @return */ public String readData(String path, boolean needWatch) { try { return new String(this.zk.getData(path, needWatch, null)); } catch (Exception e) { e.printStackTrace(); return ""; } } /** * 更新指定節點資料內容 * @param path 節點路徑 * @param data 資料內容 * @return */ public boolean writeData(String path, String data) { try { System.out.println(LOG_PREFIX_OF_MAIN + "更新資料成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1)); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 刪除指定節點 * * @param path * 節點path */ public void deleteNode(String path) { try { this.zk.delete(path, -1); System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path); } catch (Exception e) { e.printStackTrace(); } } /** * 判斷指定節點是否存在 * @param path 節點路徑 */ public Stat exists(String path, boolean needWatch) { try { return this.zk.exists(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 獲取子節點 * @param path 節點路徑 */ private List<String> getChildren(String path, boolean needWatch) { try { return this.zk.getChildren(path, needWatch); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 刪除所有節點 */ public void deleteAllTestPath() { if(this.exists(CHILDREN_PATH, false) != null){ this.deleteNode(CHILDREN_PATH); } if(this.exists(PARENT_PATH, false) != null){ this.deleteNode(PARENT_PATH); } } /** * 收到來自Server的Watcher通知後的處理。 */ @Override public void process(WatchedEvent event) { System.out.println("進入 process 。。。。。event = " + event); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } if (event == null) { return; } // 連線狀態 KeeperState keeperState = event.getState(); // 事件型別 EventType eventType = event.getType(); // 受影響的path String path = event.getPath(); String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】"; System.out.println(logPrefix + "收到Watcher通知"); System.out.println(logPrefix + "連線狀態:\t" + keeperState.toString()); System.out.println(logPrefix + "事件型別:\t" + eventType.toString()); if (KeeperState.SyncConnected == keeperState) { // 成功連線上ZK伺服器 if (EventType.None == eventType) { System.out.println(logPrefix + "成功連線上ZK伺服器"); connectedSemaphore.countDown(); } //建立節點 else if (EventType.NodeCreated == eventType) { System.out.println(logPrefix + "節點建立"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } this.exists(path, true); } //更新節點 else if (EventType.NodeDataChanged == eventType) { System.out.println(logPrefix + "節點資料更新"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(logPrefix + "資料內容: " + this.readData(PARENT_PATH, true)); } //更新子節點 else if (EventType.NodeChildrenChanged == eventType) { System.out.println(logPrefix + "子節點變更"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(logPrefix + "子節點列表:" + this.getChildren(PARENT_PATH, true)); } //刪除節點 else if (EventType.NodeDeleted == eventType) { System.out.println(logPrefix + "節點 " + path + " 被刪除"); } else ; } else if (KeeperState.Disconnected == keeperState) { System.out.println(logPrefix + "與ZK伺服器斷開連線"); } else if (KeeperState.AuthFailed == keeperState) { System.out.println(logPrefix + "許可權檢查失敗"); } else if (KeeperState.Expired == keeperState) { System.out.println(logPrefix + "會話失效"); } else ; System.out.println("--------------------------------------------"); } /** * <B>方法名稱:</B>測試zookeeper監控<BR> * <B>概要說明:</B>主要測試watch功能<BR> * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //建立watcher ZooKeeperWatcher zkWatch = new ZooKeeperWatcher(); //建立連線 zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT); //System.out.println(zkWatch.zk.toString()); Thread.sleep(1000); // 清理節點 //zkWatch.deleteAllTestPath(); if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "",true)) { Thread.sleep(1000); // 讀取資料 System.out.println("---------------------- read parent ----------------------------"); zkWatch.readData(PARENT_PATH, true); // 更新資料 zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + ""); Thread.sleep(1000); // 讀取子節點 System.out.println("---------------------- read children path ----------------------------"); zkWatch.getChildren(PARENT_PATH, true); // 建立子節點 zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "",true); //Thread.sleep(1000); //zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + ""); } //Thread.sleep(5000); // 清理節點 //zkWatch.deleteAllTestPath(); Thread.sleep(1000); zkWatch.releaseConnection(); } }
zk的事件型別和狀態型別:
資料變化對應的事件型別和狀態型別:
事件型別(跟Znode節點相關的):
EventType.NodeCreated
EventType.NodeDataChanged
EventType.NodeChildrenChanged
EventType.NodeDeleted
EventType.NONE ---連線上zk後觸發此事件型別
狀態型別(跟客戶端例項相關的):
KeeperState.Disconnected
KeeperState.SyncConnected
KeeperState.AuthFailed
KeeperState.Expired
- EventType.NONE事件
當zk客戶端連線到zk服務端觸發EventType.NONE事件,此時watcher的事件狀態是KeeperState.SyncConnected。
//建立watcher
ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
//建立連線
zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
//System.out.println(zkWatch.zk.toString());
上例子中會列印:
【Main】開始連線ZK伺服器 進入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null 【Watcher-1】收到Watcher通知 【Watcher-1】連線狀態: SyncConnected 【Watcher-1】事件型別: None 【Watcher-1】成功連線上ZK伺服器
可以看到此時的事件觸發的path為null,因為並不是節點觸發而是啟動連線成功所觸發。
- EventType.NodeCreated
例子中當我們建立/p節點時會觸發節點建立事件:
/**
* 建立節點
* @param path 節點路徑
* @param data 資料內容
* @return
*/
public boolean createPath(String path, String data,boolean watch) {
try {
//設定監控(由於zookeeper的監控都是一次性的所以 每次必須設定監控)
this.zk.exists(path, watch);
System.out.println(LOG_PREFIX_OF_MAIN + "節點建立成功, Path: " +
this.zk.create(/**路徑*/
path,
/**資料*/
data.getBytes(),
/**所有可見*/
Ids.OPEN_ACL_UNSAFE,
/**永久儲存*/
CreateMode.PERSISTENT ) +
", content: " + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
列印結果:
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p 【Main】節點建立成功, Path: /p, content: 1520667455362 【Watcher-2】收到Watcher通知 【Watcher-2】連線狀態: SyncConnected 【Watcher-2】事件型別: NodeCreated 【Watcher-2】節點建立 --------------------------------------------
注意此時我們傳入了一個watch的bool型別值,如果此時watch值為false,那麼上邊的結果還會列印嗎?不會。
- EventType.NodeDataChanged
因為zk的watcher事件的監聽是一次性的,也就是說當zk服務端將事件發生的結果通知到zk的watcher客戶端後,此前這個事件發生節點(比如/p)將不會再被監聽到任何事件,提問:如下的/p節點的資料更新事件是否會被監聽列印?
不會,除非再次設定/p的watch為true,如何操作呢?我們可以藉助exists方法或者getData方法:
那麼我們放開如下程式碼:
觀察下資料更新事件的列印:
【Main】開始連線ZK伺服器
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】連線狀態: SyncConnected
【Watcher-1】事件型別: None
【Watcher-1】成功連線上ZK伺服器
--------------------------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】節點建立成功, Path: /p, content: 1520668957729
【Watcher-2】收到Watcher通知
【Watcher-2】連線狀態: SyncConnected
【Watcher-2】事件型別: NodeCreated
【Watcher-2】節點建立
--------------------------------------------
---------------------- read parent ----------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/p
【Main】更新資料成功,path:/p, stat: 34359738397,34359738398,1520697336511,1520697337551,1,0,0,0,13,0,34359738397
【Watcher-3】收到Watcher通知
【Watcher-3】連線狀態: SyncConnected
【Watcher-3】事件型別: NodeDataChanged
【Watcher-3】節點資料更新
【Watcher-3】資料內容: 1520668958801
--------------------------------------------
- EventType.NodeChildrenChanged
放開如下程式碼,請問是否能監聽到子節點的NodeChildrenChanged事件
這樣是不能的,只會列印子節點的建立事件,原因是/p節點的子節點建立事件雖然觸發,但是子節點的改變事件並沒有設定watch為true。我們設定zkWatch.getChildren(PARENT_PATH,true);再次執行,發現子節點change事件在子節點建立後被列印:
【Main】開始連線ZK伺服器
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】連線狀態: SyncConnected
【Watcher-1】事件型別: None
【Watcher-1】成功連線上ZK伺服器
--------------------------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】節點建立成功, Path: /p, content: 1520669809911
【Watcher-2】收到Watcher通知
【Watcher-2】連線狀態: SyncConnected
【Watcher-2】事件型別: NodeCreated
【Watcher-2】節點建立
--------------------------------------------
---------------------- read parent ----------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/p
【Main】更新資料成功,path:/p, stat: 34359738417,34359738418,1520698188655,1520698189687,1,0,0,0,13,0,34359738417
【Watcher-3】收到Watcher通知
【Watcher-3】連線狀態: SyncConnected
【Watcher-3】事件型別: NodeDataChanged
【Watcher-3】節點資料更新
【Watcher-3】資料內容: 1520669810962
--------------------------------------------
---------------------- read children path ----------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p/c1
【Main】節點建立成功, Path: /p/c1, content: 1520669812011
【Watcher-4】收到Watcher通知
【Watcher-4】連線狀態: SyncConnected
【Watcher-4】事件型別: NodeCreated
【Watcher-4】節點建立
--------------------------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-5】收到Watcher通知
【Watcher-5】連線狀態: SyncConnected
【Watcher-5】事件型別: NodeChildrenChanged
【Watcher-5】子節點變更
- EventType.NodeDeleted
我們放開如下程式碼:
觀察列印:
【Main】開始連線ZK伺服器
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】連線狀態: SyncConnected
【Watcher-1】事件型別: None
【Watcher-1】成功連線上ZK伺服器
--------------------------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】節點建立成功, Path: /p, content: 1520670313945
【Watcher-2】收到Watcher通知
【Watcher-2】連線狀態: SyncConnected
【Watcher-2】事件型別: NodeCreated
【Watcher-2】節點建立
--------------------------------------------
---------------------- read parent ----------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/p
【Main】更新資料成功,path:/p, stat: 34359738424,34359738425,1520698692709,1520698693724,1,0,0,0,13,0,34359738424
【Watcher-3】收到Watcher通知
【Watcher-3】連線狀態: SyncConnected
【Watcher-3】事件型別: NodeDataChanged
【Watcher-3】節點資料更新
【Watcher-3】資料內容: 1520670315011
--------------------------------------------
---------------------- read children path ----------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p/c1
【Main】節點建立成功, Path: /p/c1, content: 1520670316035
【Watcher-4】收到Watcher通知
【Watcher-4】連線狀態: SyncConnected
【Watcher-4】事件型別: NodeCreated
【Watcher-4】節點建立
--------------------------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-5】收到Watcher通知
【Watcher-5】連線狀態: SyncConnected
【Watcher-5】事件型別: NodeChildrenChanged
【Watcher-5】子節點變更
【Watcher-5】子節點列表:[c1]
--------------------------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDeleted path:/p/c1
【Main】刪除節點成功,path:/p/c1
【Main】刪除節點成功,path:/p
【Watcher-6】收到Watcher通知
【Watcher-6】連線狀態: SyncConnected
【Watcher-6】事件型別: NodeDeleted
【Watcher-6】節點 /p/c1 被刪除
--------------------------------------------
進入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-7】收到Watcher通知
【Watcher-7】連線狀態: SyncConnected
【Watcher-7】事件型別: NodeChildrenChanged
【Watcher-7】子節點變更
由於我們對子節點/p/c1設定了監聽:
所以當刪除了/p/c1事件觸發後也同時觸發了子節點改變事件。