1. 程式人生 > >【zookeeper】事件 watch 機制 原理

【zookeeper】事件 watch 機制 原理

zk作為一款成熟的分散式協調框架,訂閱-釋出功能是很重要的一個。所謂訂閱釋出功能,其實說白了就是觀察者模式。觀察者會訂閱一些感興趣的主題,然後這些主題一旦變化了,就會自動通知到這些觀察者。

zk的訂閱釋出也就是watch機制,是一個輕量級的設計。因為它採用了一種推拉結合的模式。一旦服務端感知主題變了,那麼只會傳送一個事件型別和節點資訊給關注的客戶端,而不會包括具體的變更內容,所以事件本身是輕量級的,這就是所謂的“推”部分。然後,收到變更通知的客戶端需要自己去拉變更的資料,這就是“拉”部分。


訂閱-釋出在zk中是通過事件註冊和回撥機制實現的,下面看下這部分內容。

整個註冊回撥過程分為三個大的部分:客戶端註冊,服務端發回事件,客戶端回撥

1.客戶端註冊:

回撥介面:

public interface Watcher {
    abstract public void process(WatchedEvent event);
}
所有的事件回撥介面都需要實現這個介面,並在process內部實現回撥邏輯。event封裝了事件的資訊。event有兩個層級,第一個是state,第二個是evetType。不同的state有不同的type。

下面是對應關係:



zk的事件註冊介面:

zk的事件註冊介面主要有有以下的四類:

1.預設watch,也就是在new一個ZooKeeper例項代表了一個zk客戶端去連線伺服器的時候,在構造方法裡面傳入的一個預設watch的回撥介面,這個主要解決連線事件。在event中對應了syncConnected的state和none的type。

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

2.通過getData,getChildren和exist三個介面。每一種又有同步和非同步兩種版本。下面只看getData版本的:

 public byte[] getData(final String path, Watcher watcher, Stat stat)
 public void getData(final String path, Watcher watcher,
            DataCallback cb, Object ctx)

第一個有返回值的是同步的,第二個無返回值有回撥cb的是非同步的。當然,每一個又有幾個過載版本,這裡只貼了其中的一種。

所以註冊的介面基本上是我們先實現一個watch介面,作為回撥處理邏輯,然後呼叫以上的介面來註冊感興趣的事件。那麼這個註冊過程是怎樣的?

我們重點以getData同步版本來說明,非同步的其實在註冊這一塊是一樣的,都是通過構造packet來完成。

        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new DataWatchRegistration(watcher, clientPath);
        }
。。。
 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
在getData內部,首先構建了一個watchRegistration例項,這個類後面說,總之它封裝了了回撥介面和關注節點。然後把這個註冊物件和packetheader一起傳入了submit方法。再看submit方法:
 Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
裡面構造了一個packet,再看是如何構造的:
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;
        synchronized (outgoingQueue) {
            if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
                h.setXid(getXid());
            }
            packet = new Packet(h, r, request, response, null,
                    watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!zooKeeper.state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }

        sendThread.wakeup();
        return packet;
    }
主要就是設定了packet的屬性,然後把這個請求packet送入了傳送佇列。要知道我們註冊回撥的介面本來是用來獲取資料的,所以回撥依附在了獲取這個過程中,這裡的packet構造主要是為了獲取一次資料,構建的一個請求包,我們的事件回撥依附了這個過程,然後作為了這個請求packet的屬性儲存了起來。因為我們的是同步版本,所以packet的非同步介面cb在上一步設定為了null。這裡和回撥相關的就是設定了packet的watchRegistration屬性,也就是我們傳入的回撥介面,這是通過packet的構造方法完成的。所以有必要看下一個請求packet的內部:
static class Packet {
        RequestHeader header;

        ByteBuffer bb;

        /** Client's view of the path (may differ due to chroot) **/
        String clientPath;
        /** Servers's view of the path (may differ due to chroot) **/
        String serverPath;

        ReplyHeader replyHeader;

        Record request;

        Record response;

        boolean finished;

        AsyncCallback cb;

        Object ctx;

        WatchRegistration watchRegistration;

這是packet的屬性,這裡的wathRegistration就是回撥介面,cb是getData的非同步版本的回撥,在得到資料以後的回撥函式,也就是上面我們談到的設為null的屬性,因為我們看的是getData的同步版本,所以為null。需要明確兩個回撥的區別。
到這裡,我們的事件回撥函式已經和這次getData請求的packet關聯起來的。

那麼,最後這個packet就會進入到outgoingQueue中被髮送。

也就是在SendThread的一次write過程中。

然後getData請求的資料就會被伺服器返回,在SendThread的一次read過程中,具體在readResponse函式中的最後部分,也就是finishPacket函式中,完成最後的註冊:

    private void finishPacket(Packet p) {
        if (p.watchRegistration != null) {
            p.watchRegistration.register(p.replyHeader.getErr());
        }

        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }
可以看到這裡呼叫了一個register的方法。

下面需要了解下zk客戶端與註冊有關的資料結構:

在ZooKeeper類中,有一個內部類ZKWatchManager,是客戶端儲存所有的事件註冊的類,裡面有以下幾個重要的屬性,儲存回撥:

   private static class ZKWatchManager implements ClientWatchManager {
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();

        private volatile Watcher defaultWatcher;
從名字上就可以看出各個屬性的作用,正好對應了我們開始所說的4種回撥。

map中的key就是節點的path,set就是該節點上所有的回撥。因為預設的回撥處理只有一個,所以就不是map,其餘的事件,每一個節點都可能會有多個,所以是一個set。

再看一直出現的WatchRegistration結構:

 abstract class WatchRegistration {
        private Watcher watcher;
        private String clientPath;
        public WatchRegistration(Watcher watcher, String clientPath)
        {
            this.watcher = watcher;
            this.clientPath = clientPath;
        }

是一個抽象類,其實就是封裝了一個事件註冊,包括了感興趣的節點和回撥函式。data,children和exist三種事件都有一個對應的實現類。這個抽象類有一個非抽象方法register,負責將packet裡面的watchRegistration放到之前的watchmanager中:
        public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }
首先根據事件型別拿到正確的map,然後把watch回撥放入map裡面。

至此客戶端註冊一個事件回撥的邏輯就清晰了,總結就是,通過註冊函式來設定回撥介面為packet的屬性。然後在註冊函式收到其自身希望得到的資料的時候,來把回撥函式註冊到manager上。


服務端處理:

主要分為了兩部分,服務端新增事件,服務端觸發事件以後的處理。

先看服務端新增事件:

還是以剛才的getData為例,服務端的process收到了getData請求,就會返回資料,這個procesor是FinalRequestProcessor,其中處理getData請求的部分程式碼:

 case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ZooKeeperServer.byteBuffer2Record(request.request,
                        getDataRequest);
                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                Long aclL;
                synchronized(n) {
                    aclL = n.acl;
                }
                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                        ZooDefs.Perms.READ,
                        request.authInfo);
                Stat stat = new Stat();
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);
                rsp = new GetDataResponse(b, stat);
                break;
            }

重點是getData函式的呼叫,檢測客戶端是否註冊了watch,如果註冊了,那麼就傳cnxn,否則就傳null。這裡的cnxn其實是服務端處理io的執行緒類,後面說。getData最終會到dataTree的getData函式:
    public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
                dataWatches.addWatch(path, watcher);
            }
            return n.data;
        }
    }
會在datawatches裡面新增watch,因為我們是data型別的watch。

在Datatree類有兩個和watch相關的屬性:

    private final WatchManager dataWatches = new WatchManager();

    private final WatchManager childWatches = new WatchManager();
分別儲存了資料的子節點的watch。再看WatchManager結構:
    private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<String, HashSet<Watcher>>();

    private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<Watcher, HashSet<String>>();

主要有兩個map,儲存了節點到watch 和 watch到節點的雙向對映,這也是服務端儲存事件的結構。這樣服務端就在相應的節點上添加了一個watch。

再看服務端觸發watch事件邏輯,比如通過setData改變資料:

在datatree的

  public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
函式的最後有一段:
  dataWatches.triggerWatch(path, EventType.NodeDataChanged);
會觸發事件:
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
        return watchers;
    }
最終會呼叫process函式,這裡process函式是watch介面的實現,但是這個只有客戶端才有啊。實際上,服務端這裡的實現類就是服務端的執行緒類:NIOServerCnxn。
public class NIOServerCnxn implements Watcher, ServerCnxn
再看它的process方法:
  synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();

        sendResponse(h, e, "notification");
    }
可以看到,只是發了一個事件型別的資訊,header為-1。

客戶端執行回撥:

從上面可以看到,服務端觸發了時間以後會發送一個-1為header的相應。

那麼客戶端就會在io執行緒的read部分讀到這個資訊,最後會到readResponse函式裡處理:

            if (replyHdr.getXid() == -1) {
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else
                        event.setPath(serverPath.substring(chrootPath.length()));
                }

                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }

                eventThread.queueEvent( we );
                return;
            }
把事件event反序列化出來,構建一個watchedevent物件,然後把這個event扔進eventQueue裡面,通過的是queueEvent函式:
public void queueEvent(WatchedEvent event) {
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();

            // materialize the watchers based on the event
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }
這個函式會從之前的WatchManager中恢復出之前的回撥註冊。然後就會等待eventThread來處理。

EventThread也是一個執行緒,會週期性的處理佇列裡的事件。

public void run() {
           try {
              isRunning = true;
              while (true) {
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
調動事件的process函式即可。