Curator典型使用場景之事件監聽。
ZooKeeper原生支援通過註冊Watcher來進行事件監聽,但是其使用並不是特別方便,需要開發人員自己反覆註冊Watcher,比較繁瑣。Curator引入了Cache來實現對ZooKeeper服務端事件的監聽。Cache是Curator中對事件監聽的包裝,其對事件的監聽其實可以近似看作是一個本地快取檢視和遠端ZooKeeper檢視的對比過程。同時Curator能夠自動為開發人員處理反覆註冊監聽,從而大大簡化了原生API開發的繁瑣過程。Cache分為兩類監聽型別:節點監聽和子節點監聽。
NodeCache
NodeCache用於監聽指定ZooKeeper資料節點本身的變化,其構造方法有如下兩個:
- public NodeCache(CuratorFramework client, String path);
- public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);
NodeCache構造方法引數說明如下表所示。
引數名 說明 client Curator客戶端例項 path 資料節點的節點路徑 dataIsCompressed 是否進行資料壓縮
同時,NodeCache定義了事件處理的回撥介面NodeCacheListener。
NodeCacheListener回撥介面定義
public interface NodeCacheListener {
// Called when a change has occurred
public void nodeChanged() throws Exception;
}
當資料節點的內容發生變化的時候,就會回撥該方法。下面通過一個實際例子來看看如何在程式碼中使用NodeCache。
NodeCache使用示例
public class NodeCache_Sample {
static String path = "/zk-book/nodecache";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
final NodeCache cache = new NodeCache(client, path, false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));
}
});
client.setData().forPath(path, "u".getBytes());
Thread.sleep(1000);
client.delete().deletingChildrenIfNeeded().forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
在上面的示例程式中,首先構造了一個NodeCache例項,然後呼叫start方法,該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。
NodeCache不僅可以用於監聽資料節點的內容變更,也能監聽指定節點是否存在。如果原本節點不存在,那麼Cache就會在節點被建立後觸發NodeCacheListener。但是,如果該資料節點被刪除,那麼Curator就無法觸發NodeCacheListener了。
PathChildrenCache
PathChildrenCache用於監聽指定ZooKeeper資料節點的子節點變化情況。
PathChildrenCache有如下幾個構造方法的定義:
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData);
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory);
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean datalsCompressed, ThreadFactory threadFactory);
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean datalsCompressed, final ExecutorService executorSerivice);
- public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean datalsCompressed, final CloseableExecutorService executorService);
public PathChildrenCache構造方法引數說明如下表所示。
引數名 說明 client Curator客戶端例項 path 資料節點的節點路徑 dataIsCompressed 是否進行資料壓縮 cacheData 用於配置是否把節點內容快取起來,如果配置為true,那麼客戶端在接收到節點列表變更的同時,也能夠獲取到節點的資料內容;如果配置為false,則無法獲取到節點的資料內容 threadFactory 利用這兩個引數,開發者可以通過構造一個專門的執行緒池,來處理事件通知 executorService 利用這兩個引數,開發者可以通過構造一個專門的執行緒池,來處理事件通知
PathChildrenCache定義了事件處理的回撥介面PathChildrenCacheListener,其定義如下。
PathChildrenListener回撥介面定義
public interface PathChildrenListener {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}
當指定節點的子節點發生變化時,就會回撥該方法。PathChildrenCacheEvent類中定義了所有的事件型別,主要包括新增子節點(CHILD_ADDED)、子節點資料變更(CHILD_UPDATED)和子節點刪除(CHILD_REMOVED)三類。
下面通過一個實際例子來看如何在程式碼中使用PathChildrenCache。
PathChildrenCache使用示例
public class PathChildrenCache_Sample {
static String path = "/zk-book/nodecache";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
}client.start();
PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener(){
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception{
switch(event.getType()) {
case CHILD_ADDED :
System.out.println("CHILD_ADDED," + event.getData().getPath());
break;
case CHILD_UPDATED :
System.out.println("CHILD_UPDATED," + event.getData().getPath());
break;
case CHILD_REMOVED :
System.out.println("CHILD_REMOVED," + event.getData().getPath());
break;
default:
break;
}
}
});
client.create().withMode(CreateMode.PERSISTENT).forPath(path);
Thread.sleep(1000);
client.create().withMode(Create.PERSISTENT).forPath(path + "/c1");
Thread.sleep(1000);
client.delete().forPath(path + "/c1");
Thread.sleep(1000);
client.delete().forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}
執行程式,輸出結果如下:
在上面這個示例程式中,對/zk-book節點進行了子節點變更事件的監聽,一旦該節點新增/刪除子節點,或者子節點資料發生變更,就會回撥PathChildrenCacheListener,並根據對應的事件型別進行相關的處理。同時,我們也看到,對於節點zk=book本身的變更,並沒有通知到客戶端。
另外,和其他ZooKeeper客戶端產品一樣,Curator也無法對二級子節點進行事件監聽。也就是說,如果使用PathChildrenCache對/zk-book進行監聽,那麼當/zk-book/c1/c2節點被建立或刪除的時候,是無法觸發子節點變更事件的。
哈哈