1. 程式人生 > ># Zookeeper Curator 事件監聽 - 史上最詳

# Zookeeper Curator 事件監聽 - 史上最詳

瘋狂創客圈 Java 分散式聊天室【 億級流量】實戰系列之 -25【 部落格園 總入口


文章目錄

寫在前面

​ 大家好,我是作者尼恩。目前和幾個小夥伴一起,組織了一個高併發的實戰社群【瘋狂創客圈】。正在開始高併發、億級流程的 IM 聊天程式 學習和實戰

​ 前面,已經完成一個高效能的 Java 聊天程式的四件大事:

接下來,需要進入到分散式開發的環節了。 分散式的中介軟體,瘋狂創客圈的小夥伴們,一致的選擇了zookeeper,不僅僅是由於其在大資料領域,太有名了。更重要的是,很多的著名框架,都使用了zk。

本篇介紹 ZK Curator 的事件監聽

1.1. Curator 事件監聽

Curator 事件有兩種模式,一種是標準的觀察模式,一種是快取監聽模式。標準的監聽模式是使用Watcher 監聽器。第二種快取監聽模式引入了一種本地快取檢視的Cache機制,來實現對Zookeeper服務端事件監聽。

Cache事件監聽可以理解為一個本地快取檢視與遠端Zookeeper檢視的對比過程。Cache提供了反覆註冊的功能。Cache是一種快取機制,可以藉助Cache實現監聽。簡單來說,Cache在客戶端快取了znode的各種狀態,當感知到zk叢集的znode狀態變化,會觸發event事件,註冊的監聽器會處理這些事件。

Watcher 監聽器比較簡單,只有一種。Cache事件監聽的種類有3種Path Cache,Node Cache,Tree Cache。

1.1.1. Watcher 標準的事件處理器

在ZooKeeper中,介面類Watcher用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含KeeperState和EventType兩個列舉類,分別代表了通知狀態和事件型別。

Watcher介面定義了事件的回撥方法:process(WatchedEvent event)。定義一個Watcher的例項很簡單,程式碼如下:

Watcher w = new Watcher() {
    @Override
    public void process(WatchedEvent watchedEvent) {
        log.info("監聽器watchedEvent:" + watchedEvent);
    }
};

使用Watcher監聽器例項的方式也很簡單,在Curator的呼叫鏈上,加上usingWatcher方法即可,程式碼如下:

byte[] content = client.getData()
        .usingWatcher(w).forPath(workerPath);

一個Watcher監聽器在向服務端完成註冊後,當服務端的一些事件觸發了這個Watcher,那麼就會向指定客戶端傳送一個事件通知,來實現分散式的通知功能。客戶收到伺服器的通知後,Curator 會封裝一個WatchedEvent 事件例項,傳遞給監聽器的回撥方法process(WatchedEvent event)。

WatchedEvent包含了三個基本屬性:

(1)通知狀態(keeperState)

(2)事件型別(EventType)

(3)節點路徑(path)

注意,WatchedEvent並不是直接從ZooKeeper叢集直接傳遞過來的事件例項,而是Curator 封裝過的事件例項。WatchedEvent型別沒有實現序列化介面java.io.Serializable,因此不能用於網路傳輸。ZooKeeper叢集直接網路傳輸傳遞過來的事件例項是啥呢? 是一個WatcherEvent型別的例項,這個傳輸例項和Curator 封裝過的WatchedEvent例項,在名稱上有一個字母之差,而且功能也是一樣的,都表示的是同一個事物,都是對一個服務端事件的封裝。

因此,這裡只講Curator 封裝過的WatchedEvent例項。下邊列舉了ZooKeeper中最常見的幾個通知狀態和事件型別。

KeeperState EventType 觸發條件 說明
None (-1) 客戶端與服務端成功建立連線
SyncConnected (0) NodeCreated (1) Watcher監聽的對應資料節點被建立
NodeDeleted (2) Watcher監聽的對應資料節點被刪除 此時客戶端和伺服器處於連線狀態
NodeDataChanged (3) Watcher監聽的對應資料節點的資料內容發生變更
NodeChildChanged (4) Wather監聽的對應資料節點的子節點列表發生變更
Disconnected (0) None (-1) 客戶端與ZooKeeper伺服器斷開連線 此時客戶端和伺服器處於斷開連線狀態
Expired (-112) Node (-1) 會話超時 此時客戶端會話失效,通常同時也會受到SessionExpiredException異常
AuthFailed (4) None (-1) 通常有兩種情況,1:使用錯誤的schema進行許可權檢查 2:SASL許可權檢查失敗 通常同時也會收到AuthFailedException異常

利用Watcher來對節點進行監聽操作,但此監聽操作只能監聽一次。來看一個簡單的例項程式:

@Slf4j

@Data

public class ZkWatcherDemo {

 

    private String workerPath = "/test/listener/node";
    private String subWorkerPath = "/test/listener/node/id-";

 
    @Test
    public void testWatcher() {
        CuratorFramework client = ZKclient.instance.getClient();

        //檢查節點是否存在,沒有則建立
        boolean isExist = ZKclient.instance.isNodeExist(workerPath);
        if (!isExist) {
            ZKclient.instance.createNode(workerPath, null);
        }

        try {

            Watcher w = new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("監聽到的變化 watchedEvent = " + watchedEvent);
                }
            };

            byte[] content = client.getData()
                    .usingWatcher(w).forPath(workerPath);

            log.info("監聽節點內容:" + new String(content));

            // 第一次變更節點資料
            client.setData().forPath(workerPath, "第1次更改內容".getBytes());

            // 第二次變更節點資料
            client.setData().forPath(workerPath, "第2次更改內容".getBytes());

            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

		//....

}

執行程式碼,輸出的結果如下:

監聽到的變化 watchedEvent = WatchedEvent state:SyncConnected type:NodeDataChanged path:/test/listener/node

程式中,對節點路徑 “/test/listener/node”註冊一個Watcher監聽器例項,隨後呼叫setData方法兩次改變節點內容,但是,監聽器僅僅監聽到了一個事件。也就是說,當第二次改變節點內容時,監聽已經失效,無法再次獲得節點變動事件。

也就是說,Watcher監聽器是一次性的,如果要反覆使用,就需要反覆的使用usingWatcher提前註冊。

所以,Watcher監聽器不能應用於節點的資料變動或者節點變動這樣的一般業務場景。而是適用於一些特殊的,比如會話超時、授權失敗等這樣的特殊場景。

既然Watcher監聽器是一次性的,在開發過程中需要反覆註冊Watcher,比較繁瑣。Curator引入了Cache來監聽ZooKeeper服務端的事件。Cache對ZooKeeper事件監聽進行了封裝,能夠自動處理反覆註冊監聽。

1.1.2. NodeCache 節點快取的監聽

Curator引入的Cache快取實現,是一個系列,包括了Node Cache 、Path Cache、Tree Cache三組類。其中Node Cache節點快取可以用於ZNode節點的監聽,Path Cache子節點快取用於ZNode的子節點的監聽,而Tree Cache樹快取是Path Cache的增強,不光能監聽子節點,也能監聽ZNode節點自身。

Node Cache,可以用於監控本節點的新增,刪除,更新。

Node Cache使用的第一步,就是構造一個NodeCache快取例項。

有兩個構造方法,具體如下:

NodeCache(CuratorFramework client, String path) 

NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) 

第一個引數就是傳入建立的Curator的框架客戶端,第二個引數就是監聽節點的路徑,第三個過載引數dataIsCompressed 表示是否對資料進行壓縮。

NodeCache使用的第二步,就是構造一個NodeCacheListener監聽器例項。該介面的定義如下:

package org.apache.curator.framework.recipes.cache;

public interface NodeCacheListener {

    void nodeChanged() throws Exception;

}

NodeCacheListener監聽器介面,只定義了一個簡單的方法 nodeChanged,當節點變化時,這個方法就會被回撥到。

在建立完NodeCacheListener的例項之後,需要將這個例項註冊到NodeCache快取例項,使用快取例項的addListener方法。 然後使用快取例項nodeCache的start方法,啟動節點的事件監聽。

nodeCache.getListenable().addListener(l);

nodeCache.start(); 

強調下,需要呼叫nodeCache的start方法能進行快取和事件監聽,這個方法有兩個版本:

void    start()//Start the cache.

void    start(boolean buildInitial)  //true代表快取當前節點

唯一的一個引數buildInitial代表著是否將該節點的資料立即進行快取。如果設定為true的話,在start啟動時立即呼叫NodeCache的getCurrentData方法就能夠得到對應節點的資訊ChildData類,如果設定為false的就得不到對應的資訊。

使用NodeCache來監聽節點的事件,完整的例項程式碼如下:

    @Test
    public void testNodeCache() {

        //檢查節點是否存在,沒有則建立
        boolean isExist = ZKclient.instance.isNodeExist(workerPath);
        if (!isExist) {
            ZKclient.instance.createNode(workerPath, null);
        }

        CuratorFramework client = ZKclient.instance.getClient();
        try {
            NodeCache nodeCache =
                    new NodeCache(client, workerPath, false);
            NodeCacheListener l = new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    ChildData childData = nodeCache.getCurrentData();
                    log.info("ZNode節點狀態改變, path={}", childData.getPath());
                    log.info("ZNode節點狀態改變, data={}", new String(childData.getData(), "Utf-8"));
                    log.info("ZNode節點狀態改變, stat={}", childData.getStat());
                }
            };
            nodeCache.getListenable().addListener(l);
            nodeCache.start();

            // 第1次變更節點資料
            client.setData().forPath(workerPath, "第1次更改內容".getBytes());
            Thread.sleep(1000);

            // 第2次變更節點資料
            client.setData().forPath(workerPath, "第2次更改內容".getBytes());

            Thread.sleep(1000);

            // 第3次變更節點資料
            client.setData().forPath(workerPath, "第3次更改內容".getBytes());
            Thread.sleep(1000);

            // 第4次變更節點資料
//            client.delete().forPath(workerPath);
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            log.error("建立NodeCache監聽失敗, path={}", workerPath);
        }
    }

執行的結果是,NodeCashe節點快取能夠重複的進行事件節點。程式碼中的第三次監聽的輸出節選如下:

\- ZNode節點狀態改變, path=/test/listener/node

\- ZNode節點狀態改變, data=第3次更改內容

\- ZNode節點狀態改變, stat=17179869191,...

最後說明一下,如果NodeCache監聽的節點為空(也就是說傳入的路徑不存在)。那麼如果我們後面建立了對應的節點,也是會觸發事件從而回調nodeChanged方法。

1.1.3. PathChildrenCache 子節點監聽

PathChildrenCache子節點快取用於子節點的監聽,監控本節點的子節點被建立、更新或者刪除。需要強調兩點:

(1)只能監聽子節點,監聽不到當前節點

(2)不能遞迴監聽,子節點下的子節點不能遞迴監控

PathChildrenCache子節點快取使用的第一步,就是構造一個快取例項。

有多個過載版本的構造方法,選擇4個進行說明,具體如下:

public PathChildrenCache(CuratorFramework client, String path,boolean cacheData)

public PathChildrenCache(CuratorFramework client, String path,boolean cacheData, 
         boolean dataIsCompressed,final ExecutorService executorService)

public PathChildrenCache(CuratorFramework client, String path,boolean cacheData,
         boolean dataIsCompressed,ThreadFactory threadFactory)

public PathChildrenCache(CuratorFramework client, String path,boolean cacheData,
         ThreadFactory threadFactory)

所有的構造方法,前三個引數,都是一樣的。

第一個引數就是傳入建立的Curator的框架客戶端,第二個引數就是監聽節點的路徑,第三個過載引數cacheData表示是否把節點內容快取起來。如果cacheData為true,那麼接收到節點列表變更事件的同時,會將獲得節點內容。

dataIsCompressed引數(如果有),表示是否對節點資料進行壓縮。

executorService 和threadFactory引數差不多,表示通過傳入的執行緒池或者執行緒工廠,來非同步處理監聽事件。

threadFactory引數(如果有)表示執行緒池工廠,當PathChildrenCache內部需要開啟新的執行緒執行時,使用該執行緒池工廠來建立執行緒。

PathChildrenCache子節點快取使用的第二步,就是構造一個子節點快取監聽器PathChildrenCacheListener例項。該介面的定義如下:

package org.apache.curator.framework.recipes.cache;

import org.apache.curator.framework.CuratorFramework;
 
public interface PathChildrenCacheListener {

   void childEvent(CuratorFramework client, PathChildrenCacheEvent e) throws Exception;

}

PathChildrenCacheListener監聽器介面中,也只定義了一個簡單的方法 childEvent,當子節點有變化時,這個方法就會被回撥到。

在建立完PathChildrenCacheListener的例項之後,需要將這個例項註冊到PathChildrenCache快取例項,使用快取例項的addListener方法。 然後使用快取例項nodeCache的start方法,啟動節點的事件監聽。

這裡的start方法,需要傳入啟動的模式。可以傳入三種模式,也就是API列表中看到的StartMode,其中定義了下面三種列舉:

(1)NORMAL——非同步初始化cache

(2)BUILD_INITIAL_CACHE——同步初始化cache

(3)POST_INITIALIZED_EVENT——非同步初始化cache,並觸發完成事件

對於start模式的三種啟動方式,詳細的說明如下:

BUILD_INITIAL_CACHE:啟動時,同步初始化cache,以及建立cache後,就從伺服器拉取對應的資料。

POST_INITIALIZED_EVENT:啟動時,非同步初始化cache,初始化完成觸發PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener會收到該事件的通知。

最後是第一個列舉常量,NORMAL:啟動時,非同步初始化cache,完成後不會發出通知。

使用PathChildrenCache來監聽節點的事件,完整的例項程式碼如下:


    @Test
    public void testPathChildrenCache() {

        //檢查節點是否存在,沒有則建立
        boolean isExist = ZKclient.instance.isNodeExist(workerPath);
        if (!isExist) {
            ZKclient.instance.createNode(workerPath, null);
        }

        CuratorFramework client = ZKclient.instance.getClient();

        try {
            PathChildrenCache cache =
                    new PathChildrenCache(client, workerPath, true);
            PathChildrenCacheListener l =
                    new PathChildrenCacheListener() {
                        @Override
                        public void childEvent(CuratorFramework client,
                                               PathChildrenCacheEvent event) {
                            try {
                                ChildData data = event.getData();
                                switch (event.getType()) {
                                    case CHILD_ADDED:

                                        log.info("子節點增加, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));

                                        break;
                                    case CHILD_UPDATED:
                                        log.info("子節點更新, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));
                                        break;
                                    case CHILD_REMOVED:
                                        log.info("子節點刪除, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));
                                        break;
                                    default:
                                        break;
                                }

                            } catch (
                                    UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                    };
            cache.getListenable().addListener(l);
            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            Thread.sleep(1000);
            for (int i = 0; i < 3; i++) {
                ZKclient.instance.createNode(subWorkerPath + i, null);
            }

            Thread.sleep(1000);
            for (int i = 0; i < 3; i++) {
                ZKclient.instance.deleteNode(subWorkerPath + i);
            }

             } catch (Exception e) {
            log.error("PathCache監聽失敗, path=", workerPath);
        }

    }

執行的結果如下:

\- 子節點增加, path=/test/listener/node/id-0, data=to set content

\- 子節點增加, path=/test/listener/node/id-2, data=to set content

\- 子節點增加, path=/test/listener/node/id-1, data=to set content

......

\- 子節點刪除, path=/test/listener/node/id-2, data=to set content

\- 子節點刪除, path=/test/listener/node/id-0, data=to set content

\- 子節點刪除, path=/test/listener/node/id-1, data=to set content

可以看到,PathChildrenCache 能夠反覆的監聽到節點的新增和刪除。

簡單說下Curator的監聽原理,無論是PathChildrenCache,還是TreeCache,所謂的監聽,都是進行Curator本地快取檢視和ZooKeeper伺服器遠端的資料節點的對比。

在什麼場景下觸發事件呢?

以節點增加事件NODE_ADDED為例,所在本地快取檢視開始的時候,本地檢視為空,在資料同步的時候,本地的監聽器就能監聽到NODE_ADDED事件。這是因為,剛開始本地快取並沒有內容,然後本地快取和伺服器快取進行對比,發現ZooKeeper伺服器有節點而本地快取沒有,這才將伺服器的節點快取到本地,就會觸發本地快取的NODE_ADDED事件。

1.1.4. Tree Cache 節點樹快取

前面已經講完了兩個系列的快取監聽。簡單回顧一下:

Node Cache用來觀察ZNode自身,如果ZNode節點本身被建立,更新或者刪除,那麼Node Cache會更新快取,並觸發事件給註冊的監聽器。Node Cache是通過NodeCache類來實現的,監聽器對應的介面為NodeCacheListener。

Path Cache子節點快取用來觀察ZNode的子節點、並快取子節點的狀態,如果ZNode的子節點被建立,更新或者刪除,那麼Path Cache會更新快取,並且觸發事件給註冊的監聽器。Path Cache是通過PathChildrenCache類來實現的,監聽器註冊是通過PathChildrenCacheListener。

最後的一個系列,是Tree Cache。Tree Cache可以看做是上兩種的合體,Tree Cache觀察的是當前ZNode節點的所有資料。而TreeCache節點樹快取是PathChildrenCache的增強,不光能監聽子節點,也能監聽節點自身。

Tree Cache使用的第一步,就是構造一個TreeCache快取例項。

有兩個構造方法,具體如下:

TreeCache(CuratorFramework client, String path) 
 

TreeCache(CuratorFramework client, String path,
          boolean cacheData, boolean dataIsCompressed, int maxDepth, 
		 ExecutorService executorService, boolean createParentNodes,
		 TreeCacheSelector selector) 

第一個引數就是傳入建立的Curator的框架客戶端,第二個引數就是監聽節點的路徑,第三個過載引數dataIsCompressed 表示是否對資料進行壓縮。maxDepth表示快取的層次深度,預設為整數最大值。executorService 表示監聽的的執行執行緒池,預設會建立一個單一執行緒的執行緒池。createParentNodes 表示是否建立父親節點,預設為false。

一般情況下,使用第一個建構函式即可。

TreeCache使用的第二步,就是構造一個TreeCacheListener監聽器例項。該介面的定義如下:

package org.apache.curator.framework.recipes.cache;

 import org.apache.curator.framework.CuratorFramework;

public interface TreeCacheListener {
    void childEvent(CuratorFramework var1, TreeCacheEvent var2) throws Exception;

}

TreeCacheListener 監聽器介面中,也只定義了一個簡單的方法 childEvent,當子節點有變化時,這個方法就會被回撥到。

在建立完TreeCacheListener 的例項之後,使用快取例項的addListener方法,將TreeCacheListener 監聽器例項註冊到TreeCache 快取例項。 然後使用快取例項nodeCache的start方法,啟動節點的事件監聽。

整個例項的程式碼如下:

 @Test
    public void testTreeCache() {

        //檢查節點是否存在,沒有則建立
        boolean isExist = ZKclient.instance.isNodeExist(workerPath);
        if (!isExist) {
            ZKclient.instance.createNode(workerPath, null);
        }

        CuratorFramework client = ZKclient.instance.getClient();

        try {
            TreeCache treeCache  =
                    new TreeCache(client, workerPath);
            TreeCacheListener l =
                    new TreeCacheListener() {
                        @Override
                        public void childEvent(CuratorFramework client,
                                               TreeCacheEvent event) {
                            try {
                                ChildData data = event.getData();
                                if(data==null)
                                {
                                    log.info("資料為空");
                                    return;
                                }
                                switch (event.getType()) {
                                    case NODE_ADDED:

                                        log.info("[TreeCache]節點增加, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));

                                        break;
                                    case NODE_UPDATED:
                                        log.info("[TreeCache]節點更新, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));
                                        break;
                                    case NODE_REMOVED:
                                        log.info("[TreeCache]節點刪除, path={}, data={}",
                                                data.getPath(), new String(data.getData(), "UTF-8"));
                                        break;
                                    default:
                                        break;
                                }

                            } catch (
                                    UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                    };
            treeCache.getListenable().addListener(l);
            treeCache.start();
            Thread.sleep(1000);
            for (int i = 0; i < 3; i++) {
                ZKclient.instance.createNode(subWorkerPath + i, null);
            }

            Thread.sleep(1000);
            for (int i = 0; i < 3; i++) {
                ZKclient.instance.deleteNode(subWorkerPath + i);
            }
            Thread.sleep(1000);

            ZKclient.instance.deleteNode(workerPath);

            Thread.sleep(Integer.MAX_VALUE);

        } catch (Exception e) {
            log.error("PathCache監聽失敗, path=", workerPath);
        }

    }

執行的結果如下:

\- [TreeCache]節點增加, path=/test/listener/node, data=to set content

 

\- [TreeCache]節點增加, path=/test/listener/node/id-0, data=to set content

\- [TreeCache]節點增加, path=/test/listener/node/id-1, data=to set content

\- [TreeCache]節點增加, path=/test/listener/node/id-2, data=to set content

 

\- [TreeCache]節點刪除, path=/test/listener/node/id-2, data=to set content

\- [TreeCache]節點刪除, path=/test/listener/node/id-1, data=to set content

\- [TreeCache]節點刪除, path=/test/listener/node/id-0, data=to set content

 

\- [TreeCache]節點刪除, path=/test/listener/node, data=to set content

最後,說明下事件的型別,對應於節點的增加、修改、刪除,TreeCache 的事件型別為:

(1)NODE_ADDED

(2)NODE_UPDATED

(3)NODE_REMOVED

這一點,與Path Cache 的事件型別不同,與Path Cache 的事件型別為:

(1)CHILD_ADDED

(2)CHILD_UPDATED

(3)CHILD_REMOVED

寫在最後

​ 下一篇:基於zk,實現分散式鎖。


瘋狂創客圈 億級流量 高併發IM 實戰 系列

  • Java (Netty) 聊天程式【 億級流量】實戰 開源專案實戰