1. 程式人生 > >ZooKeeper Watcher監聽機制(資料變更的通知)(二)(分析)

ZooKeeper Watcher監聽機制(資料變更的通知)(二)(分析)

緊接著上一篇部落格:https://blog.csdn.net/Dongguabai/article/details/82970852

在輸出內容中有這樣兩個結果:

在ZooKeeper中,介面類Watcher用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含KeeperState和EventType兩個列舉類,分別代表了通知狀態和事件型別,同時定義了事件的回撥方法:process(WatchedEvent event)。

那麼什麼樣的操作會產生什麼型別的事件呢:

  event For “/path”
event For “/path/child”
create(“/path”) EventType.NodeCreatedexists/getData
delete(“/path”) EventType.NodeDeletedexists/getData
setData(“/path”) EventType.NodeDataChangedexists/getData
create(“/path/child”)
EventType.NodeChildrenChanged(getChildren) EventType.NodeCreated
delete(“/path/child”) EventType.NodeChildrenChanged(getChildren) EventType.NodeDeleted
setData(“/path/child”) EventType.NodeDataChanged

這裡可以簡單測試一下:

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

/**
 * @author Dongguabai
 * @date 2018/10/9 19:18
 */
public class ZooKeeperWatcherDemo2 {
    static final String CONNECT_ADDR = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";
    static final int SESSION_OUTTIME = 2000;//ms
    /**
     * 阻塞程式執行,等待zookeeper連線成功
     */
    static final CountDownLatch connectedSemaphore = new CountDownLatch(1);

    static final String PATH = "/dongguabai";
    static final String PATH_CHILD = PATH + "/child";

    public static void main(String[] args) {
        try {
            //連線
            ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, event -> {
                System.out.println("事件是:" + event.getType());
                //如果收到了服務端的響應事件,連線成功
                if (Watcher.Event.KeeperState.SyncConnected == event.getState()) {
                    connectedSemaphore.countDown();
                }
            });
            connectedSemaphore.await();
            System.out.println("連線成功!");

            //binding
            zk.exists(PATH,event -> {
                System.out.println("create---"+event.getType());
            });
            //create
            zk.create(PATH,"1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            zk.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

輸出結果:

再簡單測試下:

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;

/**
 * @author Dongguabai
 * @date 2018/10/9 19:18
 */
public class ZooKeeperWatcherDemo2 {
    static final String CONNECT_ADDR = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";
    static final int SESSION_OUTTIME = 2000;//ms
    /**
     * 阻塞程式執行,等待zookeeper連線成功
     */
    static final CountDownLatch connectedSemaphore = new CountDownLatch(1);

    static final String PATH = "/dongguabai";
    static final String PATH_CHILD = PATH + "/child";

    public static void main(String[] args) {
        try {
            //連線
            ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, event -> {
                System.out.println("事件是:" + event.getType());
                //如果收到了服務端的響應事件,連線成功
                if (Watcher.Event.KeeperState.SyncConnected == event.getState()) {
                    connectedSemaphore.countDown();
                }
            });
            connectedSemaphore.await();
            System.out.println("連線成功!");

            //binding
            zk.exists(PATH_CHILD, event -> {
                System.out.println("create path child---" + event.getType());
            });

            zk.exists(PATH, event -> {
                System.out.println("create path---" + event.getType());
            });

            System.out.println("create path");
            //create
            zk.create(PATH, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            zk.getChildren(PATH, event -> {
                System.out.println("create getChild---" + event.getType());
            });

            System.out.println("create path child");
            zk.create(PATH_CHILD, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            zk.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

這裡測試的時候遇到了一個關於getChildren()的問題,具體可參看:

https://blog.csdn.net/Dongguabai/article/details/82987931

從網上其他部落格中看到的比較好的總結的表格:

事件機制的原理

在上一篇部落格也介紹過了,exists()方法是可以註冊事件的。這裡以exists()方法為例,檢視org.apache.zookeeper.ZooKeeper#exists(java.lang.String, org.apache.zookeeper.Watcher):

構建了一個WatchRegistration物件,一個header,一個request和一個response,隨後客戶端會進行request提交,看看submitRequest()方法:

這個方法會組裝一個Packet的資料包,隨後會加鎖,呼叫wait()方法進行一個阻塞操作。再來看看queuePacket()方法:

這個Packet是客戶端和服務端進行通訊的一個最小的資料單元。這裡會把相關的引數再組織成為一個Packet物件。隨後會將Packet輸出佇列中去:

最後會觸發selector的執行:

簡單流程如下:

但是明顯這個流程是不完整的,因為這裡只有傳送,那怎麼接收呢。一定會有某種機制對這個佇列進行某種資料的處理,即肯定會有個東東去消費OutgoingQueue,入隊就肯定有個出隊。

先來看看ZooKeeper的構造方法:

經過一層層的呼叫,最終會到這裡,注意這裡的start()方法:

會將這個watcher賦值給一個全域性的defaultWatcher:

後面會new一個ClientCnxn。之前也介紹過一個cnxn會呼叫submitRequest方法,那個cnxn就是在這裡初始化的。來看看這個:

這裡初始化了一些資訊,但是也做了兩個事情,構造並啟動SendThread和EventThread兩個執行緒。那麼什麼時候啟動的呢,在上面提到過的start()方法就是用來啟動的:

目前整體流程圖為:

貌似沒有什麼關聯,但是隨便一想,啟動兩個執行緒肯定是要搞點事情的,接下來就看看這兩個執行緒到底做了什麼。看看SendThread:

一般我們啟動一個執行緒肯定是呼叫它的run()方法,這個時候毫不猶豫的去看看這個方法,首先可以猜想這個方法肯定是做一些outgoingQueue出隊的相關操作。

這個run()方法特別長,沒必要過於關注細節,可以先看看判斷:

這裡是一個迴圈,即當狀態是存活的時候會進入迴圈:

再會判斷連線狀態,如果沒有開啟連線,就開啟連線,同時更新最後一次傳送的Heard。

最後一定會生成一個to:

這個to相當於是客戶端和服務端一個ping的心跳去維持連線。

如果to小於0的話,意味著會話已經超時了:

往下看,有一個很關鍵的傳輸方法,有兩種實現,基於NIO或者基於Netty:

那到底走哪一個呢,其實只要把握一點,所有的構造初始化一定是在開始的時候做好的,可以再回到ZooKeeper的構造:

看看這個方法到底做了什麼:

首先會從配置檔案中讀取:

如果是null就會是基於NIO的方式,否則會根據讀取到的clientCnxnSocketName通過反射去例項化ClientCxnSocket。即預設是走NIO的方式。

可以看看如果是基於Netty的話是怎麼做的,主要就是傳輸資料包:

可以發現一個很重要的方法:

會從之前介紹的outgoingQueue中取出資料包儲存到head中。後面就是一些判斷以及錯誤處理機制等。

再往下看會看到一個關鍵的方法:

也就是說當前從outgoingQueue中獲取到的資料如果不為空的話,就會執行這個doWrite()方法:

又會呼叫一個sendPkt()方法:

先通過p.createBB(),建立緩衝位元組,再通過Netty的Channel進行傳輸,先看看createBB()方法:

主要會序列兩個引數:

然後會把bb(ByteBuffer)封裝好。最後傳送資料包,自此流程已經走通了:

剛剛只是客戶端的資料包傳送流程,那服務端是怎麼接收資料包的呢,先看看這個類:NettyServerCnxn:

這是處理請求的一個類,裡面有這樣一個方法receiveMessage():

這裡如果bb(ByteBuffer)讀完了,會執行一個packetReceived()方法:

看看這個方法,記錄了訊息遞增的一個次數:

接下來會進行訊息的相關處理:

這是處理資料包的一個方法:

最後會構建一個服務端的Request,再submitRequest():

看看這個submitRequest()方法:

服務端會有很多的processor,它會通過一個鏈式的方式把這些processor組裝起來。這裡firstProcessor為空的那段程式碼看都不用看,肯定不為空啊,看後面那一段程式碼:

首先會驗證資料包,如果資料包驗證成功了,會通過firstProcessor.processRequest()做一個處理,而這裡會有很多的實現:

具體是哪一個呼叫我們就要看firstProsessor對應的是哪個例項。可以看看ZooKeeperServer中firstProsessor是在哪裡例項化的:

在PrepRequestProcessor中又傳遞了一個RequestProcessor:

總得來說,相當於是組裝了一個鏈式的呼叫。

所以這個時候就先去找PrepRequestProcessor:

把當前的request加到了一個submittedRequests裡面:

而這個submittedRequests就是一個佇列:

可以發現在ZooKeeper中大量的通過多執行緒的方式去非同步化流程,所以在PreRequestProcessor中一定有一個對應的執行緒去啟動。PreRequestProcessor本身就是一個執行緒:

一定會有一個run()方法:

會從submittedRequests中去take()請求,隨後會經過一系列的判斷,迴圈去處理請求,看看pRequest()方法:

根據一系列的switch語句做一些判斷。在這個方法的最後:

會發現有一個nextProcessor又去呼叫了processRequest,根據前面的規律,肯定回想,這個nextProcessor是在哪裡例項化的呢:

這個nextProcessor是通過構造傳入的,在剛剛也介紹過,這時候實際上是例項化的一個SyncRequestProcessor對應的processor:

也就是說下一個processor就是SyncRequestProcessor,再去看看SyncRequestProcessor的實現:

跟上面的很類似有木有,也是加到一個佇列中:

再通過迴圈的方式去處理:

這個思想可以去借鑑,即鏈式呼叫Thread內部維護的一個Queue,通過迴圈的方式處理Queue裡面的內容,後面會簡單寫一個小Demo演示這個思想。

在這個run()方法的最後:

又發現了nextProcessor,跟上面是型別的,再看看這個nextProcessor是從哪裡構造的:

也是通過構造傳入的,這裡套路都是一樣的,通過構造去傳入下一個引用。在剛剛也介紹過,這時候實際上是例項化的一個FinalRequestProcessor對應的processor:

照葫蘆畫瓢,再看看FinalRequestProcessor的實現:

根據名字也可以知道,這是最後一個processor了。先找我們屬性的那一段程式碼:

這個是處理exists請求的。這裡通過socket,通過netty最後把request傳到了processor,然後request反序列化後獲取到了path。獲得了path就好說了,剩下的常規操作,獲取節點的Stat:

最後會設定成resp:

最後有一個sendResponse()的方法:

其實在剛剛的獲取節點的Stat中還有一個關鍵點,即繫結Watcer:

注意這裡傳入的不是Watcher,而是cnxn:

傳的是服務端的ServerCnxn。來看看這個statNode()方法:

這裡做了一個向上轉型,因為ServerCnxn實際上是實現了Watcher:

回過頭來繼續分析statNode()方法:

如果watcher不為空,就會將path和watcher(實際上是ServerCnxn)新增到dataWatches中,這個就是核心:繫結事件:

看看WatchManager中的addWatch()方法:

之前介紹過,在FinalRequestProcessor的最後會呼叫一個sendResponse()方法:

有多個實現:

來看看Netty的實現:

看看sendbuffer()方法:

也就是通過channel.write()去傳送ByteBuffer,至此,服務端完成了響應。

總體流程圖如下:

服務端完成了響應,必然客戶端也會去接收,再來看看這個類ClientCnxnSocketNetty,有這樣一個方法messageReceived():

這個方法是接收服務端的請求,也是一個非同步的過程。

看看readResponse()方法,這個是讀取服務端返回的header和response:

首先會反序列化header,然後會判斷xid,這裡有-1、-2和-4:

官方註解也說的很明白了,-2是一個ping的請求,-4是一個授權的請求,-1表示服務端觸發事件的時候才會呼叫。

接著往後面看,會從pendingQueue中移除這個packet,為什麼要移除呢,因為packet傳送完成後它會加入到pendingQueue這個集合中,直到服務端返回response。

之後會進行一系列服務端的判斷校驗,並將服務端的資訊設定到客戶端拿到的packet中,同時會把服務端返回的response反序列化:

這裡就是Stat資訊:

比如在ZooKeeper的exists方法中,就是通過這個來獲取Stat的:

之前也說過了,這些過程都是非同步的,那麼從response獲取到的Stat會不會是空的,這個不必擔心,在前面也介紹過了,在提交請求的時候實際上是有一個阻塞的過程的:

以上就是客戶端接收響應的過程,到這裡還沒有完,因為這裡還有一個finally:

看看finishPacket()方法,也比較簡單,這裡就不詳細說明了:

客戶端接收訊息的主要流程如下:

事件註冊後,那究竟是怎麼觸發的呢?在WatchManager中有一個triggerWatch()方法:

以setData()為例,我們知道setData()方法是可以觸發Wather的,那就直接看DataTree的setData()方法:

補充說明下:DataTree類維護整個目錄樹結構ConcurrentHashMap<String, DataNode> nodes儲存了從完整路徑到DataNode的hashtable,而DataNode中的Set<String> children儲存了父子關係即子節點的相對路徑。通過某DataNode可以獲知其任意子節點的相對路徑,然後拼裝成完整路徑,再去DataTree的nodes中查詢。所有對節點路徑的訪問都是通過nodes完成的。

這個dataWatches就是WatchManager:

在這裡通過dataWatches觸發節點(path)和事件型別(EventType.NodeDataChanged),再回過頭來看這個truggerWatch()方法到底做了哪些事情:

又呼叫了一個過載的triggerWatch()方法,首先會封裝一個WatchdEvent物件,引數是事件型別,同步狀態和path。

這裡有個關鍵點是,它首先會從watchTable中異常這個path(這也就是為什麼事件只會觸發一次的原因):

這個watchTable就是之前介紹過的那個HashMap:

接著triggerWatch()方法,,緊接著又會迴圈刪除watch2Paths這個Map中的這個path,因為在WatchManager中還維護者一個key是Watcher,value是path的Map,即watch2Paths,這個之前也介紹過:

最後會迴圈出發Watch:

來看下這個process()方法,這裡是不是有點小激動,多麼關鍵的方法總算來了。

這個方法的實現有很多,那到底呼叫的是哪一個呢?

再反推這個w是從哪裡來的,這個w是從watchers中來的:

那watchers是從哪裡來的呢:

是從watchTable.remove(path)返回的。那watchTable是從哪裡來的呢:

在DataTree的statNode()方法中:

再回想一下之前的呼叫過程,在FinalRequestProcessor中,我們以exists型別為例:

這裡傳遞的是不是就是cnxn,之前也介紹過了,這裡存在一個向上轉型:

因此說明呼叫process()方法的是ServerCnxn,可以看看它的實現:

而這又是個模板的抽象方法,可以看看有哪些實現:

看看NettyServerCnxn的實現:

最終會呼叫sendResponse()方法,要注意的是這個h中的xid是-1,前面也介紹過了,-1是一個引導流程的非常重要的引數。

會對header和response做一個序列化,之後通過channel再去傳送。至此服務端的事件觸發已經打通,總的來說服務端會發送一個WatchedEvent給客戶端。

客戶端的接收流程就還是不變,畢竟程式碼是公共的嘛,先看看CientCnxnSocketNetty:

收到訊息也是會呼叫整個readResponse方法,這裡又回到之前介紹過的地方了:

這裡用-1、-2、-4代表執行什麼流程,這裡也沒有強制性的使用設計模式,如果服務端返回的是事件型別,也就是-1,就會進入下面的流程:

經過反序列化獲取服務端傳過來的WatchedEvent,最終會將WatchedEvent作為引數執行queueEvent():

這裡materializedWatchers是空的,就會執行:

來看看materialize()方法:

看到了很熟悉的existWatches,客戶端會把所有的事件放到existWatches中去:

簡單點說,ZKWatchManager是客戶端管理Watcher的,WatchManager是服務端管理Watcher的。

也就是說這裡的這個方法返回的watchers的得到的是所有的exists的watcher事件:

最終在ClientCnxn中會新增到佇列中:

而這個waitingEvents是在哪裡被呼叫呢,是在EventThread裡賣弄呼叫的,看看EventThread的run()方法:

的確也是從佇列裡面取到資料然後搞事情,搞什麼事情呢,執行processEvent()方法:

會迴圈遍歷呼叫process()方法,而真正執行的不就是我們在客戶端實現Watcher重寫的process()方法嘛。就這樣最終完成了事件的回撥。

 

更多相關資料:

https://blog.csdn.net/liu857279611/article/details/70495413

 https://www.cnblogs.com/programlearning/archive/2017/05/10/6834963.html

https://blog.csdn.net/Dongguabai/article/details/82953133

https://blog.csdn.net/Dongguabai/article/details/82907095

https://www.jianshu.com/p/d6301f07ad39

http://www.cnblogs.com/sunddenly/articles/4087251.html