zookeeper源碼之配置監聽
阿新 • • 發佈:2018-01-30
com continue 點數據 lis process 節點數 hashset ace tree
配置存儲不僅維護了一個樹結構,還對各個節點添加了變更監聽。
類圖
DataTree內部維護兩個通知管理器,分別監聽節點數據變更和子節點變更。
public class DataTree { private final WatchManager dataWatches = new WatchManager(); private final WatchManager childWatches = new WatchManager(); public void removeCnxn(Watcher watcher) { dataWatches.removeWatcher(watcher); childWatches.removeWatcher(watcher); }public String createNode(String path, byte data[], List<ACL> acl, long ephemeralOwner, long zxid, long time) throws KeeperException.NoNodeException, KeeperException.NodeExistsException { ... String parentName = path.substring(0, lastSlash); ... dataWatches.triggerWatch(path, Event.EventType.NodeCreated); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); return path; } public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException { int lastSlash = path.lastIndexOf(‘/‘); String parentName = path.substring(0, lastSlash); String childName= path.substring(lastSlash + 1); DataNode node = nodes.get(path); ... Set<Watcher> processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); childWatches.triggerWatch(path, EventType.NodeDeleted, processed); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, EventType.NodeChildrenChanged); } public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { ... DataNode n = nodes.get(path); ... dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; } public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); ... dataWatches.addWatch(path, watcher); ... return n.data; } } }
WatchManager主要維護了路徑和監聽器的關聯關系,當響應的路徑發生變化時,調用監聽器的監聽方法。
public class WatchManager { ... private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>(); private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>(); public synchronized int size(){ return watchTable.size(); } public synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path); if (list == null) { list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); } public synchronized void removeWatcher(Watcher watcher) { HashSet<String> paths = watch2Paths.remove(watcher); if (paths == null) { return; } for (String p : paths) { HashSet<Watcher> list = watchTable.get(p); if (list != null) { list.remove(watcher); if (list.size() == 0) { watchTable.remove(p); } } } } ... public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); ...for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; } ... }
Watcher定義了監聽接口方法。
public interface Watcher { abstract public void process(WatchedEvent event); }
zookeeper源碼之配置監聽