Zookeeper原始碼閱讀(七) Server端Watcher
阿新 • • 發佈:2018-11-08
前言
前面一篇主要介紹了Watcher介面相關的介面和實體類,但是主要是zk客戶端相關的程式碼,如前一篇開頭所說,client需要把watcher註冊到server端,這一篇分析下server端的watcher。
主要分析Watchmanager類。
Watchmanager
這是WatchManager的類圖介紹。來看看程式碼:
/** * This class manages watches. It allows watches to be associated with a string * and removes watchers and their watches in addition to managing triggers. */ //如註釋所言,這個類主要負責管理watcher public class WatchManager { private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class); //路徑->watcher的對映 private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>(); //watcher->路徑的對映 private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
size
public synchronized int size(){
int result = 0;
for(Set<Watcher> watches : watchTable.values()) {//遍歷路徑->watcher的對映
result += watches.size();//把所有的watch個數加起來,但這裡是不是會有重複???
}
return result;
}
addWatch
//為某個path註冊watcher public synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path);//獲得路徑對應的watcher的set if (list == null) {//之前沒有watcher // don't waste memory if there are few watches on a node // rehash when the 4th entry is added, doubling size thereafter // seems like a good compromise list = new HashSet<Watcher>(4);//這裡有優化,只建立為4的set,可能是考慮到實際使用中同一個znode不會有過多的watcher,節省了memory watchTable.put(path, list);//更新watchtable } list.add(watcher);//新增watcher進入set HashSet<String> paths = watch2Paths.get(watcher);//在watcher->路徑中查詢對應的路徑 if (paths == null) { // cnxns typically have many watches, so use default cap here paths = new HashSet<String>();//同理,同一個watcher可能被加到多個znode上 watch2Paths.put(watcher, paths); } paths.add(path);//加入set }
其實這個方法總的來說就是兩大步,第一是更新路徑->watcher的對映,第二步是更新watcher->路徑的對映,很好理解。
removeWatcher
//與上面方法相反,remove對應的watcher public synchronized void removeWatcher(Watcher watcher) { HashSet<String> paths = watch2Paths.remove(watcher);//從watcher->路徑的對映中把整個watcher和它對應的所有path刪掉 if (paths == null) {//paths是否為空 return; } for (String p : paths) {//不為空的話就取出來一個一個在路徑->watcher的對映裡掃描 HashSet<Watcher> list = watchTable.get(p);//取出watcher的set if (list != null) { list.remove(watcher);//remove對應的watcher if (list.size() == 0) {//如果之前只有一個watcher,那麼相應的path就沒有watcher了,應該刪掉 watchTable.remove(p); } } } }
這裡其實也是兩大步,第一是更新watcher->路徑的對映,第二步更新路徑->watcher的對映,只是第二步的時候需要遍歷所有path。
triggerWatch
//根據事件型別和路徑觸發watcher,supress是指定的應該被過濾的watcher集合
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);//新建watchedEvent物件,這時一定是連線狀態的
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);//把對應路徑所有的watcher刪除並返回
if (watchers == null || watchers.isEmpty()) {//watcher為空直接打log
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {//watcher不為空
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);//把所有的路徑刪掉
}
}
}
for (Watcher w : watchers) {//遍歷前面獲得的所有watcher
if (supress != null && supress.contains(w)) {//如果watcher在supress的set中跳過
continue;
}
w.process(e);//不在set中就觸發
}
return watchers;
}
這裡有兩點需要特別說一下:
為啥這裡需要一個過濾的操作呢,可以通過下面datatree中deletenode裡的程式碼可以瞭解:
Set<Watcher> processed = dataWatches.triggerWatch(path, EventType.NodeDeleted);//1 childWatches.triggerWatch(path, EventType.NodeDeleted, processed);//2 childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, EventType.NodeChildrenChanged);
可以看到,每個節點對應的watch會存到datawatches裡,且如果一個節點是另一個節點的子節點,那麼在server獲取getchildren指令的時候會把children相關的的watch加入到datatree的childwatches裡去。這時如果節點本身已經觸發過了那麼childwatches裡的節點的watches便不用被觸發了(因為節點都要被delete了,不存在子節點)。
- 最後的process方法並不是客戶端的watcher,而是ServerCnxn的process,預設實現是NIOServerCnxn。
@Override
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();//包裝watcherevent
sendResponse(h, e, "notification");//傳送回覆
}
DumpWatches
/**
* String representation of watches. Warning, may be large!
* @param byPath iff true output watches by paths, otw output
* watches by connection
* @return string representation of watches
*/
//把watch寫到磁碟中
public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
if (byPath) {
for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) {
pwriter.println(e.getKey());//利用PrintWriter去寫
for (Watcher w : e.getValue()) {
pwriter.print("\t0x");
pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
pwriter.print("\n");
}
}
} else {
for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) {
pwriter.print("0x");
pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
for (String path : e.getValue()) {
pwriter.print("\t");
pwriter.println(path);
}
}
}
}
總結
- zk的cnxn的實現由NIO和Netty兩種方式,最近工作也用了些Netty相關的,抽空好好學習總結下。
參考
https://www.cnblogs.com/leesf456/p/6288709.html
https://www.jianshu.com/p/9cf98fab15ac