ZooKeeper Watcher監聽機制(資料變更的通知)(二)(分析)
緊接著上一篇部落格:https://blog.csdn.net/Dongguabai/article/details/82970852
在輸出內容中有這樣兩個結果:
在ZooKeeper中,介面類Watcher用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含KeeperState和EventType兩個列舉類,分別代表了通知狀態和事件型別,同時定義了事件的回撥方法:process(WatchedEvent event)。
那麼什麼樣的操作會產生什麼型別的事件呢:
event For “/path” |
event For “/path/child” | |
create(“/path”) | EventType.NodeCreated(exists/getData) | 無 |
delete(“/path”) | EventType.NodeDeleted(exists/getData) | 無 |
setData(“/path”) | EventType.NodeDataChanged(exists/getData) | 無 |
create(“/path/child”) |
EventType.NodeChildrenChanged(getChildren) | EventType.NodeCreated |
delete(“/path/child”) | EventType.NodeChildrenChanged(getChildren) | EventType.NodeDeleted |
setData(“/path/child”) | 無 | EventType.NodeDataChanged |
這裡可以簡單測試一下:
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; /** * @author Dongguabai * @date 2018/10/9 19:18 */ public class ZooKeeperWatcherDemo2 { static final String CONNECT_ADDR = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181"; static final int SESSION_OUTTIME = 2000;//ms /** * 阻塞程式執行,等待zookeeper連線成功 */ static final CountDownLatch connectedSemaphore = new CountDownLatch(1); static final String PATH = "/dongguabai"; static final String PATH_CHILD = PATH + "/child"; public static void main(String[] args) { try { //連線 ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, event -> { System.out.println("事件是:" + event.getType()); //如果收到了服務端的響應事件,連線成功 if (Watcher.Event.KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } }); connectedSemaphore.await(); System.out.println("連線成功!"); //binding zk.exists(PATH,event -> { System.out.println("create---"+event.getType()); }); //create zk.create(PATH,"1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.close(); } catch (Exception e) { e.printStackTrace(); } } }
輸出結果:
再簡單測試下:
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
/**
* @author Dongguabai
* @date 2018/10/9 19:18
*/
public class ZooKeeperWatcherDemo2 {
static final String CONNECT_ADDR = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";
static final int SESSION_OUTTIME = 2000;//ms
/**
* 阻塞程式執行,等待zookeeper連線成功
*/
static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
static final String PATH = "/dongguabai";
static final String PATH_CHILD = PATH + "/child";
public static void main(String[] args) {
try {
//連線
ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, event -> {
System.out.println("事件是:" + event.getType());
//如果收到了服務端的響應事件,連線成功
if (Watcher.Event.KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
});
connectedSemaphore.await();
System.out.println("連線成功!");
//binding
zk.exists(PATH_CHILD, event -> {
System.out.println("create path child---" + event.getType());
});
zk.exists(PATH, event -> {
System.out.println("create path---" + event.getType());
});
System.out.println("create path");
//create
zk.create(PATH, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.getChildren(PATH, event -> {
System.out.println("create getChild---" + event.getType());
});
System.out.println("create path child");
zk.create(PATH_CHILD, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
這裡測試的時候遇到了一個關於getChildren()的問題,具體可參看:
https://blog.csdn.net/Dongguabai/article/details/82987931
從網上其他部落格中看到的比較好的總結的表格:
事件機制的原理
在上一篇部落格也介紹過了,exists()方法是可以註冊事件的。這裡以exists()方法為例,檢視org.apache.zookeeper.ZooKeeper#exists(java.lang.String, org.apache.zookeeper.Watcher):
構建了一個WatchRegistration物件,一個header,一個request和一個response,隨後客戶端會進行request提交,看看submitRequest()方法:
這個方法會組裝一個Packet的資料包,隨後會加鎖,呼叫wait()方法進行一個阻塞操作。再來看看queuePacket()方法:
這個Packet是客戶端和服務端進行通訊的一個最小的資料單元。這裡會把相關的引數再組織成為一個Packet物件。隨後會將Packet輸出佇列中去:
最後會觸發selector的執行:
簡單流程如下:
但是明顯這個流程是不完整的,因為這裡只有傳送,那怎麼接收呢。一定會有某種機制對這個佇列進行某種資料的處理,即肯定會有個東東去消費OutgoingQueue,入隊就肯定有個出隊。
先來看看ZooKeeper的構造方法:
經過一層層的呼叫,最終會到這裡,注意這裡的start()方法:
會將這個watcher賦值給一個全域性的defaultWatcher:
後面會new一個ClientCnxn。之前也介紹過一個cnxn會呼叫submitRequest方法,那個cnxn就是在這裡初始化的。來看看這個:
這裡初始化了一些資訊,但是也做了兩個事情,構造並啟動SendThread和EventThread兩個執行緒。那麼什麼時候啟動的呢,在上面提到過的start()方法就是用來啟動的:
目前整體流程圖為:
貌似沒有什麼關聯,但是隨便一想,啟動兩個執行緒肯定是要搞點事情的,接下來就看看這兩個執行緒到底做了什麼。看看SendThread:
一般我們啟動一個執行緒肯定是呼叫它的run()方法,這個時候毫不猶豫的去看看這個方法,首先可以猜想這個方法肯定是做一些outgoingQueue出隊的相關操作。
這個run()方法特別長,沒必要過於關注細節,可以先看看判斷:
這裡是一個迴圈,即當狀態是存活的時候會進入迴圈:
再會判斷連線狀態,如果沒有開啟連線,就開啟連線,同時更新最後一次傳送的Heard。
最後一定會生成一個to:
這個to相當於是客戶端和服務端一個ping的心跳去維持連線。
如果to小於0的話,意味著會話已經超時了:
往下看,有一個很關鍵的傳輸方法,有兩種實現,基於NIO或者基於Netty:
那到底走哪一個呢,其實只要把握一點,所有的構造初始化一定是在開始的時候做好的,可以再回到ZooKeeper的構造:
看看這個方法到底做了什麼:
首先會從配置檔案中讀取:
如果是null就會是基於NIO的方式,否則會根據讀取到的clientCnxnSocketName通過反射去例項化ClientCxnSocket。即預設是走NIO的方式。
可以看看如果是基於Netty的話是怎麼做的,主要就是傳輸資料包:
可以發現一個很重要的方法:
會從之前介紹的outgoingQueue中取出資料包儲存到head中。後面就是一些判斷以及錯誤處理機制等。
再往下看會看到一個關鍵的方法:
也就是說當前從outgoingQueue中獲取到的資料如果不為空的話,就會執行這個doWrite()方法:
又會呼叫一個sendPkt()方法:
先通過p.createBB(),建立緩衝位元組,再通過Netty的Channel進行傳輸,先看看createBB()方法:
主要會序列兩個引數:
然後會把bb(ByteBuffer)封裝好。最後傳送資料包,自此流程已經走通了:
剛剛只是客戶端的資料包傳送流程,那服務端是怎麼接收資料包的呢,先看看這個類:NettyServerCnxn:
這是處理請求的一個類,裡面有這樣一個方法receiveMessage():
這裡如果bb(ByteBuffer)讀完了,會執行一個packetReceived()方法:
看看這個方法,記錄了訊息遞增的一個次數:
接下來會進行訊息的相關處理:
這是處理資料包的一個方法:
最後會構建一個服務端的Request,再submitRequest():
看看這個submitRequest()方法:
服務端會有很多的processor,它會通過一個鏈式的方式把這些processor組裝起來。這裡firstProcessor為空的那段程式碼看都不用看,肯定不為空啊,看後面那一段程式碼:
首先會驗證資料包,如果資料包驗證成功了,會通過firstProcessor.processRequest()做一個處理,而這裡會有很多的實現:
具體是哪一個呼叫我們就要看firstProsessor對應的是哪個例項。可以看看ZooKeeperServer中firstProsessor是在哪裡例項化的:
在PrepRequestProcessor中又傳遞了一個RequestProcessor:
總得來說,相當於是組裝了一個鏈式的呼叫。
所以這個時候就先去找PrepRequestProcessor:
把當前的request加到了一個submittedRequests裡面:
而這個submittedRequests就是一個佇列:
可以發現在ZooKeeper中大量的通過多執行緒的方式去非同步化流程,所以在PreRequestProcessor中一定有一個對應的執行緒去啟動。PreRequestProcessor本身就是一個執行緒:
一定會有一個run()方法:
會從submittedRequests中去take()請求,隨後會經過一系列的判斷,迴圈去處理請求,看看pRequest()方法:
根據一系列的switch語句做一些判斷。在這個方法的最後:
會發現有一個nextProcessor又去呼叫了processRequest,根據前面的規律,肯定回想,這個nextProcessor是在哪裡例項化的呢:
這個nextProcessor是通過構造傳入的,在剛剛也介紹過,這時候實際上是例項化的一個SyncRequestProcessor對應的processor:
也就是說下一個processor就是SyncRequestProcessor,再去看看SyncRequestProcessor的實現:
跟上面的很類似有木有,也是加到一個佇列中:
再通過迴圈的方式去處理:
這個思想可以去借鑑,即鏈式呼叫Thread內部維護的一個Queue,通過迴圈的方式處理Queue裡面的內容,後面會簡單寫一個小Demo演示這個思想。
在這個run()方法的最後:
又發現了nextProcessor,跟上面是型別的,再看看這個nextProcessor是從哪裡構造的:
也是通過構造傳入的,這裡套路都是一樣的,通過構造去傳入下一個引用。在剛剛也介紹過,這時候實際上是例項化的一個FinalRequestProcessor對應的processor:
照葫蘆畫瓢,再看看FinalRequestProcessor的實現:
根據名字也可以知道,這是最後一個processor了。先找我們屬性的那一段程式碼:
這個是處理exists請求的。這裡通過socket,通過netty最後把request傳到了processor,然後request反序列化後獲取到了path。獲得了path就好說了,剩下的常規操作,獲取節點的Stat:
最後會設定成resp:
最後有一個sendResponse()的方法:
其實在剛剛的獲取節點的Stat中還有一個關鍵點,即繫結Watcer:
注意這裡傳入的不是Watcher,而是cnxn:
傳的是服務端的ServerCnxn。來看看這個statNode()方法:
這裡做了一個向上轉型,因為ServerCnxn實際上是實現了Watcher:
回過頭來繼續分析statNode()方法:
如果watcher不為空,就會將path和watcher(實際上是ServerCnxn)新增到dataWatches中,這個就是核心:繫結事件:
看看WatchManager中的addWatch()方法:
之前介紹過,在FinalRequestProcessor的最後會呼叫一個sendResponse()方法:
有多個實現:
來看看Netty的實現:
看看sendbuffer()方法:
也就是通過channel.write()去傳送ByteBuffer,至此,服務端完成了響應。
總體流程圖如下:
服務端完成了響應,必然客戶端也會去接收,再來看看這個類ClientCnxnSocketNetty,有這樣一個方法messageReceived():
這個方法是接收服務端的請求,也是一個非同步的過程。
看看readResponse()方法,這個是讀取服務端返回的header和response:
首先會反序列化header,然後會判斷xid,這裡有-1、-2和-4:
官方註解也說的很明白了,-2是一個ping的請求,-4是一個授權的請求,-1表示服務端觸發事件的時候才會呼叫。
接著往後面看,會從pendingQueue中移除這個packet,為什麼要移除呢,因為packet傳送完成後它會加入到pendingQueue這個集合中,直到服務端返回response。
之後會進行一系列服務端的判斷校驗,並將服務端的資訊設定到客戶端拿到的packet中,同時會把服務端返回的response反序列化:
這裡就是Stat資訊:
比如在ZooKeeper的exists方法中,就是通過這個來獲取Stat的:
之前也說過了,這些過程都是非同步的,那麼從response獲取到的Stat會不會是空的,這個不必擔心,在前面也介紹過了,在提交請求的時候實際上是有一個阻塞的過程的:
以上就是客戶端接收響應的過程,到這裡還沒有完,因為這裡還有一個finally:
看看finishPacket()方法,也比較簡單,這裡就不詳細說明了:
客戶端接收訊息的主要流程如下:
事件註冊後,那究竟是怎麼觸發的呢?在WatchManager中有一個triggerWatch()方法:
以setData()為例,我們知道setData()方法是可以觸發Wather的,那就直接看DataTree的setData()方法:
補充說明下:DataTree類維護整個目錄樹結構,ConcurrentHashMap<String, DataNode> nodes儲存了從完整路徑到DataNode的hashtable,而DataNode中的Set<String> children儲存了父子關係,即子節點的相對路徑。通過某DataNode可以獲知其任意子節點的相對路徑,然後拼裝成完整路徑,再去DataTree的nodes中查詢。所有對節點路徑的訪問都是通過nodes完成的。
這個dataWatches就是WatchManager:
在這裡通過dataWatches觸發節點(path)和事件型別(EventType.NodeDataChanged),再回過頭來看這個truggerWatch()方法到底做了哪些事情:
又呼叫了一個過載的triggerWatch()方法,首先會封裝一個WatchdEvent物件,引數是事件型別,同步狀態和path。
這裡有個關鍵點是,它首先會從watchTable中異常這個path(這也就是為什麼事件只會觸發一次的原因):
這個watchTable就是之前介紹過的那個HashMap:
接著triggerWatch()方法,,緊接著又會迴圈刪除watch2Paths這個Map中的這個path,因為在WatchManager中還維護者一個key是Watcher,value是path的Map,即watch2Paths,這個之前也介紹過:
最後會迴圈出發Watch:
來看下這個process()方法,這裡是不是有點小激動,多麼關鍵的方法總算來了。
這個方法的實現有很多,那到底呼叫的是哪一個呢?
再反推這個w是從哪裡來的,這個w是從watchers中來的:
那watchers是從哪裡來的呢:
是從watchTable.remove(path)返回的。那watchTable是從哪裡來的呢:
在DataTree的statNode()方法中:
再回想一下之前的呼叫過程,在FinalRequestProcessor中,我們以exists型別為例:
這裡傳遞的是不是就是cnxn,之前也介紹過了,這裡存在一個向上轉型:
因此說明呼叫process()方法的是ServerCnxn,可以看看它的實現:
而這又是個模板的抽象方法,可以看看有哪些實現:
看看NettyServerCnxn的實現:
最終會呼叫sendResponse()方法,要注意的是這個h中的xid是-1,前面也介紹過了,-1是一個引導流程的非常重要的引數。
會對header和response做一個序列化,之後通過channel再去傳送。至此服務端的事件觸發已經打通,總的來說服務端會發送一個WatchedEvent給客戶端。
客戶端的接收流程就還是不變,畢竟程式碼是公共的嘛,先看看CientCnxnSocketNetty:
收到訊息也是會呼叫整個readResponse方法,這裡又回到之前介紹過的地方了:
這裡用-1、-2、-4代表執行什麼流程,這裡也沒有強制性的使用設計模式,如果服務端返回的是事件型別,也就是-1,就會進入下面的流程:
經過反序列化獲取服務端傳過來的WatchedEvent,最終會將WatchedEvent作為引數執行queueEvent():
這裡materializedWatchers是空的,就會執行:
來看看materialize()方法:
看到了很熟悉的existWatches,客戶端會把所有的事件放到existWatches中去:
簡單點說,ZKWatchManager是客戶端管理Watcher的,WatchManager是服務端管理Watcher的。
也就是說這裡的這個方法返回的watchers的得到的是所有的exists的watcher事件:
最終在ClientCnxn中會新增到佇列中:
而這個waitingEvents是在哪裡被呼叫呢,是在EventThread裡賣弄呼叫的,看看EventThread的run()方法:
的確也是從佇列裡面取到資料然後搞事情,搞什麼事情呢,執行processEvent()方法:
會迴圈遍歷呼叫process()方法,而真正執行的不就是我們在客戶端實現Watcher重寫的process()方法嘛。就這樣最終完成了事件的回撥。
更多相關資料:
https://blog.csdn.net/liu857279611/article/details/70495413
https://www.cnblogs.com/programlearning/archive/2017/05/10/6834963.html
https://blog.csdn.net/Dongguabai/article/details/82953133
https://blog.csdn.net/Dongguabai/article/details/82907095