1. 程式人生 > >Curator典型使用場景之事件監聽。

Curator典型使用場景之事件監聽。

        ZooKeeper原生支援通過註冊Watcher來進行事件監聽,但是其使用並不是特別方便,需要開發人員自己反覆註冊Watcher,比較繁瑣。Curator引入了Cache來實現對ZooKeeper服務端事件的監聽。Cache是Curator中對事件監聽的包裝,其對事件的監聽其實可以近似看作是一個本地快取檢視和遠端ZooKeeper檢視的對比過程。同時Curator能夠自動為開發人員處理反覆註冊監聽,從而大大簡化了原生API開發的繁瑣過程。Cache分為兩類監聽型別:節點監聽和子節點監聽。

NodeCache

        NodeCache用於監聽指定ZooKeeper資料節點本身的變化,其構造方法有如下兩個:

  • public NodeCache(CuratorFramework client, String path);
  • public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

        NodeCache構造方法引數說明如下表所示。

引數名 說明
client Curator客戶端例項
path 資料節點的節點路徑
dataIsCompressed 是否進行資料壓縮

        同時,NodeCache定義了事件處理的回撥介面NodeCacheListener。

NodeCacheListener回撥介面定義

public interface NodeCacheListener {

// Called when a change has occurred

public void nodeChanged() throws Exception;

}

        當資料節點的內容發生變化的時候,就會回撥該方法。下面通過一個實際例子來看看如何在程式碼中使用NodeCache。

NodeCache使用示例

public class NodeCache_Sample {

static String path = "/zk-book/nodecache";

static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

public static void main(String[] args) throws Exception {

client.start();

client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());

final NodeCache cache = new NodeCache(client, path, false);

cache.start(true);

cache.getListenable().addListener(new NodeCacheListener() {

@Override

public void nodeChanged() throws Exception {

System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));

}

});

client.setData().forPath(path, "u".getBytes());

Thread.sleep(1000);

client.delete().deletingChildrenIfNeeded().forPath(path);

Thread.sleep(Integer.MAX_VALUE);

}

}

        在上面的示例程式中,首先構造了一個NodeCache例項,然後呼叫start方法,該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。

        NodeCache不僅可以用於監聽資料節點的內容變更,也能監聽指定節點是否存在。如果原本節點不存在,那麼Cache就會在節點被建立後觸發NodeCacheListener。但是,如果該資料節點被刪除,那麼Curator就無法觸發NodeCacheListener了。

PathChildrenCache

        PathChildrenCache用於監聽指定ZooKeeper資料節點的子節點變化情況。

        PathChildrenCache有如下幾個構造方法的定義:

  • public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
  • public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory);
  • public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean datalsCompressed, ThreadFactory threadFactory);
  • public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean datalsCompressed, final ExecutorService executorSerivice);
  • public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean datalsCompressed, final CloseableExecutorService executorService);

        public PathChildrenCache構造方法引數說明如下表所示。

引數名 說明
client Curator客戶端例項
path 資料節點的節點路徑
dataIsCompressed 是否進行資料壓縮
cacheData 用於配置是否把節點內容快取起來,如果配置為true,那麼客戶端在接收到節點列表變更的同時,也能夠獲取到節點的資料內容;如果配置為false,則無法獲取到節點的資料內容
threadFactory 利用這兩個引數,開發者可以通過構造一個專門的執行緒池,來處理事件通知
executorService 利用這兩個引數,開發者可以通過構造一個專門的執行緒池,來處理事件通知

        PathChildrenCache定義了事件處理的回撥介面PathChildrenCacheListener,其定義如下。

PathChildrenListener回撥介面定義

public interface PathChildrenListener {

public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;

}

        當指定節點的子節點發生變化時,就會回撥該方法。PathChildrenCacheEvent類中定義了所有的事件型別,主要包括新增子節點(CHILD_ADDED)、子節點資料變更(CHILD_UPDATED)和子節點刪除(CHILD_REMOVED)三類。

        下面通過一個實際例子來看如何在程式碼中使用PathChildrenCache。

PathChildrenCache使用示例

public class PathChildrenCache_Sample {

static String path = "/zk-book/nodecache";

static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

public static void main(String[] args) throws Exception {

client.start();

PathChildrenCache cache = new PathChildrenCache(client, path, true);

cache.start(StartMode.POST_INITIALIZED_EVENT);

cache.getListenable().addListener(new PathChildrenCacheListener(){

public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)  throws Exception{

switch(event.getType()) {

case CHILD_ADDED :

System.out.println("CHILD_ADDED," + event.getData().getPath());

break;

case CHILD_UPDATED :

System.out.println("CHILD_UPDATED," + event.getData().getPath());

break;

case CHILD_REMOVED :

System.out.println("CHILD_REMOVED," + event.getData().getPath());

break;

default:

break;

}

}

});

client.create().withMode(CreateMode.PERSISTENT).forPath(path);

Thread.sleep(1000);

client.create().withMode(Create.PERSISTENT).forPath(path + "/c1");

Thread.sleep(1000);

client.delete().forPath(path + "/c1");

Thread.sleep(1000);

client.delete().forPath(path);

Thread.sleep(Integer.MAX_VALUE);

}

}

        執行程式,輸出結果如下:

        在上面這個示例程式中,對/zk-book節點進行了子節點變更事件的監聽,一旦該節點新增/刪除子節點,或者子節點資料發生變更,就會回撥PathChildrenCacheListener,並根據對應的事件型別進行相關的處理。同時,我們也看到,對於節點zk=book本身的變更,並沒有通知到客戶端。

        另外,和其他ZooKeeper客戶端產品一樣,Curator也無法對二級子節點進行事件監聽。也就是說,如果使用PathChildrenCache對/zk-book進行監聽,那麼當/zk-book/c1/c2節點被建立或刪除的時候,是無法觸發子節點變更事件的。

哈哈