1. 程式人生 > >Zookeeper原始碼閱讀(七) Server端Watcher

Zookeeper原始碼閱讀(七) Server端Watcher

前言

前面一篇主要介紹了Watcher介面相關的介面和實體類,但是主要是zk客戶端相關的程式碼,如前一篇開頭所說,client需要把watcher註冊到server端,這一篇分析下server端的watcher。

主要分析Watchmanager類。

Watchmanager

這是WatchManager的類圖介紹。來看看程式碼:

/**
 * This class manages watches. It allows watches to be associated with a string
 * and removes watchers and their watches in addition to managing triggers.
 */
//如註釋所言,這個類主要負責管理watcher
public class WatchManager {
    private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);

    //路徑->watcher的對映
    private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<String, HashSet<Watcher>>();

    //watcher->路徑的對映
    private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<Watcher, HashSet<String>>();

size

public synchronized int size(){
    int result = 0;
    for(Set<Watcher> watches : watchTable.values()) {//遍歷路徑->watcher的對映
        result += watches.size();//把所有的watch個數加起來,但這裡是不是會有重複???
    }
    return result;
}

addWatch

//為某個path註冊watcher
public synchronized void addWatch(String path, Watcher watcher) {
    HashSet<Watcher> list = watchTable.get(path);//獲得路徑對應的watcher的set
    if (list == null) {//之前沒有watcher
        // don't waste memory if there are few watches on a node
        // rehash when the 4th entry is added, doubling size thereafter
        // seems like a good compromise
        list = new HashSet<Watcher>(4);//這裡有優化,只建立為4的set,可能是考慮到實際使用中同一個znode不會有過多的watcher,節省了memory
        watchTable.put(path, list);//更新watchtable
    }
    list.add(watcher);//新增watcher進入set

    HashSet<String> paths = watch2Paths.get(watcher);//在watcher->路徑中查詢對應的路徑
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();//同理,同一個watcher可能被加到多個znode上
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);//加入set
}

其實這個方法總的來說就是兩大步,第一是更新路徑->watcher的對映,第二步是更新watcher->路徑的對映,很好理解。

removeWatcher

//與上面方法相反,remove對應的watcher
public synchronized void removeWatcher(Watcher watcher) {
    HashSet<String> paths = watch2Paths.remove(watcher);//從watcher->路徑的對映中把整個watcher和它對應的所有path刪掉
    if (paths == null) {//paths是否為空
        return;
    }
    for (String p : paths) {//不為空的話就取出來一個一個在路徑->watcher的對映裡掃描
        HashSet<Watcher> list = watchTable.get(p);//取出watcher的set
        if (list != null) {
            list.remove(watcher);//remove對應的watcher
            if (list.size() == 0) {//如果之前只有一個watcher,那麼相應的path就沒有watcher了,應該刪掉
                watchTable.remove(p);
            }
        }
    }
}

這裡其實也是兩大步,第一是更新watcher->路徑的對映,第二步更新路徑->watcher的對映,只是第二步的時候需要遍歷所有path。

triggerWatch

//根據事件型別和路徑觸發watcher,supress是指定的應該被過濾的watcher集合
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
            KeeperState.SyncConnected, path);//新建watchedEvent物件,這時一定是連線狀態的
    HashSet<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path);//把對應路徑所有的watcher刪除並返回
        if (watchers == null || watchers.isEmpty()) {//watcher為空直接打log
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,
                        ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                        "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {//watcher不為空
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);//把所有的路徑刪掉
            }
        }
    }
    for (Watcher w : watchers) {//遍歷前面獲得的所有watcher
        if (supress != null && supress.contains(w)) {//如果watcher在supress的set中跳過
            continue;
        }
        w.process(e);//不在set中就觸發
    }
    return watchers;
}

這裡有兩點需要特別說一下:

  1. 為啥這裡需要一個過濾的操作呢,可以通過下面datatree中deletenode裡的程式碼可以瞭解:

    Set<Watcher> processed = dataWatches.triggerWatch(path,
            EventType.NodeDeleted);//1
    childWatches.triggerWatch(path, EventType.NodeDeleted, processed);//2
    childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
            EventType.NodeChildrenChanged);

可以看到,每個節點對應的watch會存到datawatches裡,且如果一個節點是另一個節點的子節點,那麼在server獲取getchildren指令的時候會把children相關的的watch加入到datatree的childwatches裡去。這時如果節點本身已經觸發過了那麼childwatches裡的節點的watches便不用被觸發了(因為節點都要被delete了,不存在子節點)。

  1. 最後的process方法並不是客戶端的watcher,而是ServerCnxn的process,預設實現是NIOServerCnxn。
@Override
synchronized public void process(WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                 "Deliver event " + event + " to 0x"
                                 + Long.toHexString(this.sessionId)
                                 + " through " + this);
    }

    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();//包裝watcherevent

    sendResponse(h, e, "notification");//傳送回覆
}

DumpWatches

/**
 * String representation of watches. Warning, may be large!
 * @param byPath iff true output watches by paths, otw output
 * watches by connection
 * @return string representation of watches
 */
//把watch寫到磁碟中
public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
    if (byPath) {
        for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) {
            pwriter.println(e.getKey());//利用PrintWriter去寫
            for (Watcher w : e.getValue()) {
                pwriter.print("\t0x");
                pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
                pwriter.print("\n");
            }
        }
    } else {
        for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) {
            pwriter.print("0x");
            pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
            for (String path : e.getValue()) {
                pwriter.print("\t");
                pwriter.println(path);
            }
        }
    }
}

總結

  1. zk的cnxn的實現由NIO和Netty兩種方式,最近工作也用了些Netty相關的,抽空好好學習總結下。

參考

https://www.cnblogs.com/leesf456/p/6288709.html

https://www.jianshu.com/p/9cf98fab15ac