1. 程式人生 > >【轉】Zookeeper-Watcher機制與非同步呼叫原理

【轉】Zookeeper-Watcher機制與非同步呼叫原理

宣告:本文轉載自http://shift-alt-ctrl.iteye.com/blog/1847320,轉載請務必宣告。

Watcher機制:目的是為ZK客戶端操作提供一種類似於非同步獲得資料的操作.

1)在建立Zookeeper例項時,允許接收一個watcher引數,此引數將會賦值給watchMnanger.defaultWatcher,成為當前客戶端的預設Watcher.需要注意此watcher和其他watcher不同,此wather主要是響應"與連結狀態轉換"有關的事件(比如,"建立連結","連結關閉"等,參見KeeperState).此預設watcher有zk client本地持有且生命週期伴隨整個zookeeper例項,而不是"一次觸發即消亡",當Client收到EventType,NONE型別的訊息時,則會觸發這個"預設wather"被執行..(參見:

訊息型別)

2)ZKWatchManager是客戶端watcher管理器,負責跟蹤多種watcher,watcher被分為dataWatches,existWatches,childWatches.每種型別的watcher將會被存在各自的Map中(key為path,value為Set<Watcher>,由此可見,在一個path上一種型別操作重複註冊同一個watcher物件,事實上只會生效一次,不同的watcher物件是可以的).記住:這些watcher只是一些存根,由ZKWatchManager負責管理,並不會隨請求傳送給server,而只會發給server此請求型別是否註冊了watch(原始碼:request.setWatch(boolean))

3)對於setData,exist,getChildren操作,都可以接收boolean型別的watcher標識和Watcher物件,boolean型別告知請求使用defaultWatcher物件註冊事件.

4)在ZKDatabase中,包括一個DataTree,此dataTree持有對nodes以及相關的watcher的資料.在server端,WatcherManager是管理client註冊的watcher,它只管理dataWatches和childWatches,沒有對exist型別的watch.其資料結構為HashSet<path,Set<Watcher>>,和ZKWatchManager一致.(對於exist型別的請求,sever端將其watch加入dataWatches中,這個很好理解)

5)請求到達server之後,在FinalRequestProcessor中,將會處理各種請求,如果檢測到request.getWatch()為true,即請求要求註冊watch,那麼將會把ServerCnxn和path關聯起來,加入到WatherManager相應的列表中.

6)客戶端的請求響應之後,由SendThread.readResponse()處理響應,如果響應code為成功且此請求中註冊了watch,那麼將會把此wath新增到響應的watch列表中。

7)ServerCnxn(抽象類)實現了Watcher介面,每個client在server端都對應一個ServerCnxn,此類(子類)是client請求/響應的處理器,不過所有的請求最終還是由一個執行緒負責通訊。在ServerCnxn處理請求時出現異常或者client關閉,將會導致ServerCnxn呼叫close()方法,此方法中有個分支操作就是從DataTree中的兩種watches列表中刪除其關聯的watch。

8)WatcherManager是server端watch管理器,此類包含2個不同的資料結構用來儲存watch以方便查詢,其中一個是watch2path為HashMap<Watcher, HashSet<String>>;另一個是watchTable為HashMap<String, HashSet<Watcher>>。其實這2個map儲存的資料一樣,只是查詢的場景不同;這2個map將會被同時操作。

9)DataTree持有2個WatchManager物件,分別為dataWatches用於管理註冊data操作的watch,childWatches用於管理註冊child操作的watch。

10)WatchManager中還有一個很重要的操作,trigerWatch(String path,EvenType type),當server接受到例如createNode/deleteNode/setData等操作時,將會操作ZKDatabase來操作DataTree中的資料,當然dataTree的資料改動,將會觸發相應patch(節點)上的watch(有可能一個操作會導致多種watch被觸發),trigerWatch就是在這些時機下被呼叫。此操作中就是從watchManager中將相應path下注冊的watch移除,並依次呼叫watch.process()。此process()做了一件事情,就是向client傳送一個nofication訊息,此訊息中包含一個WatchEvent物件,此物件封裝了事件的型別/path等。

11)客戶端接受到nofication,並反序獲取WatchEvent,然後和server端的watcherManager一樣,ZKWatcherManager根據event型別,從相應的一個或多個watches列表中分別移除相應path的watch,並將這些“移除”的watches再次封裝成一個WatcherSetEventPair,此物件持有event和watches集合。最後將此pair加入event佇列。

12)client的EventThread將會不斷輪詢,從event佇列中獲取pair,並遍歷pair中關聯的watcher,依次呼叫watcher的process()方法。。當然此watcher的process方法是client使用者自己實現的,因為watcher物件是client使用者在例項化zookeeper時包括各種操作時交付給zookeeper的。所以使用者應該根據自己的需要,在client受到event時做自己的處理。

 


 

F1.Watch生命週期

  1.  Zookeeper提供瞭如下幾種可以"註冊watch"的操作:exist,getChildren,getData;而對於create,setData,delete是有可能觸發"watcher"的操作.
  2. 客戶端並不會把使用者建立的watcher物件傳遞給Server,而是傳遞給server一個標記(boolean值)告知server此請求所涉及到的patch上是否有watcher..
  3. 對於client端請求是佇列話的,即一個操作阻塞直到server端響應.(非同步操作稍後介紹,它不阻塞)
  4. Server對Client的每個請求的響應體中,都會明確告知此次響應的型別(是正常操作響應還是"事件",操作對應的xid,結果型別,錯誤資訊等等);如果響應體中沒有錯誤資訊且其他校驗正常的話,我們認為此次請求被正確的執行了.
  5. 可能考慮到在Client與Server端傳遞wath物件所帶來的程式複雜度,ZK採取了"分制"的方式,在Client端和Server端分別採取了不同的技巧來儲存Watch列表;(參見上述)
  6. Server在接收Client請求時,會檢測此次request體中是否持有watcher資訊,如果有,則會導致Server端的watcher列表中新增一個此path關聯的watch,只有exist/getChildren/getData會導致此操作.記住watcher資訊將會被儲存在ZKDatabase中(記憶體中,而非持久,ZKDatabase會持久Session/ACL/Data).
  7. 那麼對於create/setData/delete請求,將會觸發watcher列表的檢測,比如create操作,建立一個path,在實際的資料儲存結束後,將會在watch列表中遍歷是否有此path所關聯的watches,如果有,則依次觸發.
  8. 觸發watch其實很簡單,對於server端而言,它持有了每個path所關聯的watch列表,而且每個watch例項正是一個ServerCnxn物件(每個Client與Server的連線處理器,就是一個ServerCnxn物件),因為觸發一個watcher將是便捷性將是顯而易見的,直接將此watcher事件所對應的path/型別直接通過IO的方式傳送出去;因此哪個Client註冊了事件,將會被響應的ServerCnxn處理;叢集中每個Server幾乎會在同一時間向Client交付事件訊息.可能因為網路的問題,不可確保他們能夠在極短的時間差內都獲得事件.
  9. "插隊",是因為對於watcher事件,將不再和其他Client操作放在同一佇列中,而是直接通過IO傳送,因為ServerCnxn處理client響應是同步的(方法是同步方法),即事件資訊將會在當前packet傳送之後被立即傳送.
  10. 事件一旦被server觸發,將會在watcher列表中刪除,因此watcher是一次性的(同一個path下的同一型別watcher).我們不能依賴wathcer來全權檢測資料的變更,因為網路斷開可能會導致事件通知的丟失;當事件被觸發之後,server端將刪除事件,即使client端再次註冊watcher,那麼"上一次事件"和"重新註冊事件"這段事件內,仍然有可能資料已經變更.(備註:Watcher watch = watchTable.remove(path);watch.process();首先從watchtable中移除watch,然後再將watch資訊傳送給client端,即使在傳送時網路異常,watch也不會再次put到watchTable中,事實上此時watch已經被消費.)
  11. Client接收到Event響應結果之後,將會把此訊息體放在eventQueue中,等待EventThread去remove並觸發.
  12. EventThread將event佇列中的事件,逐個移除並處理,每移除一個event,都會導致Client本地維護的watcher列表刪除相應的watcher(根據path和event型別決定),移除之後並獲取到Client維護的watcher物件(此物件就是先前的操作中註冊的watcher),watcher物件明確了回撥方法,此時將會執行watcher.process(),那麼呼叫者的業務方法將會在此刻被執行.[對於業務方法被執行,從整個週期中,我們可以認為是非同步的].
  13. 對於節點的create操作,將會觸發先前註冊的"exist""getChildren"事件被觸發;對於節點的delete操作,將會觸發先前註冊的"exsit""getChildren"事件被觸發;對於節點的setData操作,將會觸發先前註冊的"getData"事件被觸發......每個觸發的事件都會包含事件的型別(比如:nodeCreate,nodeDelete等),對於使用者自定義的watch.process()方法中可以根據事件型別做特定的處理.
  14. 對於Server端遇到session關閉,連線關閉等異常時,都會觸發和此連線(ServerCnxn)關聯的watch列表.
  15. 不過對於Client端卻做了"彌補";"zookeeper.disableAutoWatchReset"這個系統引數的意義就是"是否關閉watch自動重置";如果此引數為false(即為開啟"自動重置"),那麼在Client端遇到連線異常(比如重連操作)時,都會將本地已有的watcher列表全部發送給Server(此操作稱為"setWatches"),如果連線成功,那麼新的server仍然會持有watcher列表,接下來事件將會被如期觸發,就像網路異常根本就沒發生一樣..那麼為什麼ZK沒有預設開啟此引數呢?可能考慮到這是個雙刃劍,Client有可能在網路異常時會做其他的操作(因為網路異常,最終也會觸發一個本地的Event,Client可以在此Event中做自定義操作);也有可能在網路異常期間,Cluster中的資料已經被改變,極有可能這些事件中的部分事件已經被錯過,即使接下來被觸發,也將不能正確的反應目前的現狀.如果你期望獲得正確的結果,要麼重新註冊watcher,要麼檢測現有的資料是否已經改變.

Zookeeper客戶端不僅提供了同步操作,還有非同步操作,對於create/delete/exist/setData等,ZK分別提供了同步和非同步方法,我們上述瞭解到的,都是同步操作,簡單做如下列舉:

    public Stat exists(String path,Watcher watcher):同步方法,檢測path是否存在,如果存在則返回節點的全資訊,否則返回null.如果此後此path被建立或者刪除,則觸發watcher.

    public void exist(String path,Watcher watcher,StatCallback cb,Object ctx):這個方法就是非同步的,它需要指定一個StatCallback例項,以便在請求被處理之後,非同步的執行callback操作.

我相信你一定知道如何將呼叫過程設計為"非同步"[提示:非同步即為操作佇列話 + callback呼叫].

在Zookeeper中,同步方法樣例:

public ReplyHeader submitRequest(RequestHeader h, Record request,  
            Record response, WatchRegistration watchRegistration)  
            throws InterruptedException {  
        ReplyHeader r = new ReplyHeader();  
        //將請求加入佇列,此佇列將會被SendThread操作,並依此傳送請求.  
        Packet packet = queuePacket(h, r, request, response, null, null, null,  
                    null, watchRegistration);  
       //直接阻塞當前請求  
        synchronized (packet) {  
            while (!packet.finished) {  
                packet.wait();//此處阻塞,直到響應,響應被接受後,會對此packet.notify()呼叫.  
            }  
        }  
        return r;//返回處理的結果  
    } 

 那麼對於非同步操作,只調用queuePacket(....)將請求新增到佇列,然後exist方法就直接返回了.不過在響應被成功接收後,會額外的檢測此packet是否有callback,如果有,就立即執行:

private void finishPacket(Packet p) {  
        if (p.watchRegistration != null) {  
            p.watchRegistration.register(p.replyHeader.getErr());  
        }  
        //此處就是檢測callback  
        if (p.cb == null) {  
            synchronized (p) {  
                p.finished = true;  
                p.notifyAll();  
            }  
        } else {  
            p.finished = true;  
            eventThread.queuePacket(p);//將非同步呼叫packet新增到事件佇列,依此被處理.  
        }  
}

到目前為止,watcher機制我們已經走到"頭"了...