1. 程式人生 > >zookeeper源碼之配置監聽

zookeeper源碼之配置監聽

com continue 點數據 lis process 節點數 hashset ace tree

  配置存儲不僅維護了一個樹結構,還對各個節點添加了變更監聽。

類圖

技術分享圖片

  DataTree內部維護兩個通知管理器,分別監聽節點數據變更和子節點變更。

public class DataTree {
   private final WatchManager dataWatches = new WatchManager();
   private final WatchManager childWatches = new WatchManager();
public void removeCnxn(Watcher watcher) {
        dataWatches.removeWatcher(watcher);
        childWatches.removeWatcher(watcher);
    }
public String createNode(String path, byte data[], List<ACL> acl, long ephemeralOwner, long zxid, long time) throws KeeperException.NoNodeException, KeeperException.NodeExistsException { ... String parentName = path.substring(0, lastSlash); ... dataWatches.triggerWatch(path, Event.EventType.NodeCreated); childWatches.triggerWatch(parentName.equals(
"") ? "/" : parentName, Event.EventType.NodeChildrenChanged); return path; } public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException { int lastSlash = path.lastIndexOf(‘/‘); String parentName = path.substring(0, lastSlash); String childName
= path.substring(lastSlash + 1); DataNode node = nodes.get(path); ... Set<Watcher> processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); childWatches.triggerWatch(path, EventType.NodeDeleted, processed); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, EventType.NodeChildrenChanged); } public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { ... DataNode n = nodes.get(path); ... dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; } public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); ... dataWatches.addWatch(path, watcher); ... return n.data; } } }

  WatchManager主要維護了路徑和監聽器的關聯關系,當響應的路徑發生變化時,調用監聽器的監聽方法。

public class WatchManager {
 ...
    private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();
    private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
    public synchronized int size(){
        return watchTable.size();
    }
    public synchronized void addWatch(String path, Watcher watcher) {
        HashSet<Watcher> list = watchTable.get(path);
        if (list == null) {
            list = new HashSet<Watcher>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);
        HashSet<String> paths = watch2Paths.get(watcher);
        if (paths == null) {
            paths = new HashSet<String>();
            watch2Paths.put(watcher, paths);
        }
        paths.add(path);
    }
    public synchronized void removeWatcher(Watcher watcher) {
        HashSet<String> paths = watch2Paths.remove(watcher);
        if (paths == null) {
            return;
        }
        for (String p : paths) {
            HashSet<Watcher> list = watchTable.get(p);
            if (list != null) {
                list.remove(watcher);
                if (list.size() == 0) {
                    watchTable.remove(p);
                }
            }
        }
    }
...
    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            ...for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
        return watchers;
    }
...
}

  Watcher定義了監聽接口方法。

public interface Watcher {
    abstract public void process(WatchedEvent event);
}

zookeeper源碼之配置監聽