1. 程式人生 > >ZooKeeper之Java客戶端API使用—讀取資料。

ZooKeeper之Java客戶端API使用—讀取資料。

        讀取資料,包括子節點列表的獲取和節點資料的獲取。ZooKeeper分別提供了不同的API來獲取資料。

getChildren

        客戶端可以通過ZooKeeper的API來獲取一個節點的所有子節點,有如下8個介面可供使用:

  • List<String> getChildren(final String path , Watcher watcher)
  • List<String> getChildren(String path , boolean watch)
  • void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
  • void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
  • List<String> getChildren(final String path , Watcher watcher, Stat stat)
  • List<String> getChildren(String path , boolean watch, Stat stat)
  • void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
  • void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)

        這裡列出的8個API包含了同步和非同步的介面,API方法的引數說明如下表所示。

引數名 說明
path 指定資料節點的節點路徑,即API呼叫的目的是獲取該節點的子節點列表
watcher 註冊的Watcher。一旦在本次子節點獲取之後,子節點列表發生變更的話,那麼就會向客戶端傳送通知。該引數允許傳入null
watch 表明是否需要註冊一個Watcher。這裡就要使用預設Watcher了。如果這個引數是true,那麼ZooKeeper客戶端會自動使用上文中提到的那個預設Watcher;如果是false,表明不需要註冊Watcher
cb 註冊一個非同步回撥函式
ctx 用於傳遞上下文資訊的物件
stat 指定資料節點的節點狀態資訊。用法是在介面中傳入一箇舊的stat變數,該stat變數會在方法執行執行過程中,被來自伺服器響應的新stat物件替換

Watcher       

        註冊Watcher,如果ZooKeeper客戶端在獲取到指定節點的子節點列表後,還需要訂閱這個子節點列表的變化通知,那麼就可以通過註冊一個Watcher來實現。當有子節點被新增或是刪除時,伺服器就會向客戶端傳送一個NodeChildren Change(EventType.NodeChildrenChanged)型別的事件通知。需要注意的是,在服務端傳送給客戶端的事件通知中,是不包含最新的節點列表的,客戶端必須主動重新進行獲取。通常客戶端在收到這個事件通知後,就可以再次獲取最新的子節點列表了。

stat

        描述節點狀態資訊的物件:stat。stat物件中記錄了一個節點的基本屬性資訊,例如列電建立時事務ID(cZxid)、最後一次修改的事務ID(mZxid)和節點資料內容的長度(dataLength)等。有時候,我們不僅需要獲取節點最新的子節點列表,還要獲取這個節點最新的節點狀態資訊。對於這種情況,我們可以將一箇舊的stat變數傳入API介面,該stat變數會在方法執行過程中,被來自服務端響應的新stat物件替換。

使用同步API獲取子節點列表

// ZooKeeper API 獲取子節點列表,使用同步(sync)介面

public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

pirvate static ZooKeeper zk = null;

public static void main(String[] args) throws Exception {

String path = "/zk-book";

zk = new ZooKeeper("domain1.book.zookeeper:2181", 5000, new ZooKeeper_GetChildren_API_Sync_Usage());

connectedSemaphore.await();

zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

zk.create(path+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

List<String> childrenList = zk.getChildren(path , true);

System.out.println(childrenList);

zk.create(path+"/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

Thread.sleep(Integer.MAX_VALUE);

}

public void process(WatchedEvent event) {

if (KeeperState.SyncConnected == event.getState()) {

if (EventType.None == event.getType() && null == event.getPath) {

connectedSemaphore.countDown();

} else if (event.getType() == EventType.NodeChildrenChanged) {

try {

System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true));

} catch (Exception e) {}

}

}

}

}

        執行程式,輸出結果如下:

        在上面這個程式中,我們首先建立了一個父節點/zk-book,以及一個子節點/zk-book/c1。然後呼叫getChildren的同步介面來獲取/zk-book節點下的所有子節點,同時在介面呼叫的時候註冊了一個Watcher。之後,我們繼續向/zk-book節點建立子節點/zk-book/c2。由於之前我們對/zk-book節點註冊了一個Watcher,因此,一旦此時有子節點被建立,ZooKeeper服務端就會客戶端發出一個“子節點變更”的事件通知,於是,客戶但在收到這個事件通知之後就可以再次呼叫getChildren方法來獲取新的子節點列表。

        另外,從輸出結果中我們還可以發現,呼叫getChildren獲取到節點列表,都是資料節點的相對節點路徑,例如上面輸出結果中的c1和c2,事實上,完整的ZNode路徑應該是/zk-book/c1和zk-book/c2。

        關於Watcher,這裡簡單提一點,ZooKeeper服務端在向客戶端傳送Watcher “NodeChildrenChanged”事件通知的時候,僅僅只會發出一個通知,而不會把節點的變化情況傳送給客戶端,需要客戶端自己重新獲取。另外,由於Watcher通知是一次性的,即一旦觸發一次通知後,該Watcher就失效了,因此客戶端需要反覆註冊Watcher。

使用非同步API獲取子節點列表

// ZooKeeper API 獲取子節點列表,使用非同步(async)介面

public class ZooKeeper_GetChildren_API_ASync_Usage implements Watcher {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

private static ZooKeeper zk = null;

public static void main(String[] args) throws Exception {

String path = "/zk-book";

zk = new ZooKeeper("domain1.book.zookeeper:2181", 5000, new ZooKeeper_GetChildren_API_ASync_Usage());

connectedSemaphore.await();

zk.create(path , "".getBytes() , Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

zk.create(path+"/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

zk.getChildren(path, true, new IChildren2Callback(), null);

zk.create(path + "/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

Thread.sleep(Integer.MAX_VALUE);

}

public void process(WatchedEvent event) {

if(KeeperState.SyncConnected == event.getState()) {

if(KeeperState.SyncConnected == event.getState()) {

if(EventType.None == event.getType() && null == event.getPath()) {

connectedSemaphore.countDown();

} else if (event.getType == EventType.NodeChildrenChanged) {

try {

System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true));

} catch (Exception e) {}

}

}

}

}

}

class IChildren2Callback implements AsyncCallback.Children2Callback {

public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

System.out.println("Get Children znode result:[response code:" + rc + ", param path:" + path + ", ctx: " + ", children list: " + ", stat:" + stat);

}

}

        執行程式,輸出結果如下:

        在上面這個程式中,我們將子節點列表的獲取邏輯進行了非同步化。非同步介面通常會應用在這樣的使用場景中:應用啟動的時候,會獲取一些配置資訊,例如“機器列表”,這些配置通常比較大,並且不希望配置的獲取影響應用的主流程。

getData

        客戶端可以通過ZooKeeper的API來獲取一個節點的資料內容,有如下4個介面:

  • byte[] getData(final String path, Watcher watcher, Stat stat)
  • byte[] getData(String path, boolean watch, Stat stat)
  • void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
  • void getData(String path, boolean watch, DataCallback cb, Object ctx)

        這裡列出的4個API包含了同步和非同步的介面,API方法的引數說明如下表所示。

引數名 說明
path 指定資料節點的節點路徑,即API呼叫的目的是獲取該節點的資料內容
watcher 註冊的Watcher。一旦之後節點內容有變更,就會向客戶端傳送通知。該引數允許傳入null
stat 指定資料節點的節點狀態資訊。用法是在介面中傳入一箇舊的stat變數,該stat變數會在方法執行過程中,被來自服務端響應的新stat物件替換。
watch 表明是否需要註冊一個Watcher。這裡使用到預設Watcher。如果這個引數是true,那麼ZooKeeper客戶端會自動使用上文中提到的那個預設Watcher:如果是false,表明不需要註冊Watcher
cb 註冊一個非同步回撥函式
ctx 用於傳遞上下文資訊的物件

        getData介面和上文中的getChildren介面的用法基本相同,這裡主要看一看註冊的Watcher有什麼不同之處。客戶端在獲取一個節點的資料內容的時候,是可以進行Watcher註冊的,這樣一來,一旦該節點的狀態發生變更,那麼ZooKeeper服務端就會向客戶端傳送一個NodeDataChanged(EventType.NodeDataChanged) 的事件通知。

        另外,API返回結果的型別是byte[],目前ZooKeeper只支援這種型別的資料儲存,所以在獲取資料的時候也是返回此型別。

使用同步API獲取節點資料內容

// ZooKeeper API 獲取節點資料內容,使用同步(sync)介面

public class ZooKeeper_GetData_API_Sync_Usage implements Watcher {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

pirvate static ZooKeeper zk = null;

public static void main(String[] args) throws Exception {

String path = "/zk-book";

zk = new ZooKeeper("domain1.book.zookeeper:2181", 5000, new ZooKeeper_GetData_API_Sync_Usage());

connectedSemaphore.await();

zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

System.out.println(new String(zk.getData(path, true, stat)));
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
zk.setData(path, "123".Bytes(), -1);

Thread.sleep(Integer.MAX_VALUE);

}

public void process(WatchedEvent event) {

if (KeeperState.SyncConnected == event.getState()) {

if (EventType.None == event.getType() && null == event.getPath) {

connectedSemaphore.countDown();

} else if (event.getType() == EventType.NodeChildrenChanged) {

try {

System.out.println(new String(zk.getData(event.getPath(), true, stat)));

System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());

} catch (Exception e) {}

}

}

}

}

        執行程式,輸出結果如下:

        在上面這個程式中,我們首先建立了一個節點/zk-book,並初始化其資料內容為“123”.然後呼叫getData的同步介面來獲取/zk-book節點的資料內容,呼叫的同時註冊了一個Watcher。之後,我們同樣以“123”去更新將該節點的資料內容,此時,由於我們之前在該節點上註冊了一個Watcher,因此,一旦該節點的資料發生變化,ZooKeeper服務端就會向客戶端發出一個“資料變更”的事件通知,於是,客戶端可以收到這個事件通知後,再次呼叫getData介面來獲取新的資料內容。

        另外,在呼叫getData介面的同時,我們傳入了一個stat變數,在ZooKeeper客戶端的內部實現中,會從服務端的響應中獲取到資料節點的最新節點狀態資訊,來替換這個客戶端的舊狀態。

        我們重點再來看下執行上面這個程式的輸出結果中,前後兩次呼叫getData介面的返回值。第一次的輸出結果如下:

       第二次的輸出結果如下:

        第一次是客戶端主動呼叫getData介面來獲取資料;第二次則是節點資料變更後,服務端傳送Watcher事件通知給客戶端後,客戶端再次呼叫getData介面來獲取資料。兩次呼叫的輸出結果中,節點資料內容的值並沒有變化。既然節點的資料內容並沒有變化,那麼ZooKeeper服務端為什麼會向客戶端傳送Watcher事件通知呢。這裡,我們必須明確一個概念:節點的資料內容或是節點的資料版本變化,都被看作是ZooKeeper節點的變化。明白這個概念後,再回過頭看上面的結果輸出,可以看出,該節點在Zxid為“253404961568”時被建立,在Zxid為“253404961576”時被更新,於是節點的資料版本從“0”變化到“1”.所以,這裡我們要明確的一點是,蜀軍欸容或是資料版本變化,都會觸發服務端的NodeDataChanged通知。

使用非同步API獲取節點資料內容

// ZooKeeper API 獲取節點資料內容,使用非同步(async)介面

public class GetData_API_ASync_Usage implements Watcher {

private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

private static ZooKeeper zk = null;

public static void main(String[] args) throws Exception {

String path = "/zk-book";

zk = new ZooKeeper("domain1.book.zookeeper:2181", 5000, new GetData_API_ASync_Usage());

connectedSemaphore.await();

zk.create(path , "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode, EPHEMERAL);
zk.getData(path, true, new IDataCallback(), null);
zk.setData(path, "123".getBytes(), -1);

Thread.sleep(Integer.MAX_VALUE);

}

public void process(WatchedEvent event) {

if(KeeperState.SyncConnected == event.getState()) {

if(KeeperState.SyncConnected == event.getState()) {

if(EventType.None == event.getType() && null == event.getPath()) {

connectedSemaphore.countDown();

} else if (event.getType == EventType.NodeChildrenChanged) {

try {

zk.getData(event.getPath(), true, new IDataCallback(), null);

} catch (Exception e) {}

}

}

}

}

}

class IChildren2Callback implements AsyncCallback.Children2Callback {

public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

System.out.println(rc +", " + path + ", " + new String(data));

System.out.println(stat.getCzxid() + "," + stat.getVersion());

}

}

        執行程式,輸出結果如下:

        上面就是使用getData的非同步介面來獲取節點資料內容的示例程式。