1. 程式人生 > >zookeeper客戶端和服務端互動分析

zookeeper客戶端和服務端互動分析

原文連結

ZkClient

       在使用ZooKeeper的Java客戶端時,經常需要處理幾個問題:重複註冊watcher、session失效重連、異常處理。

       要解決上述的幾個問題,可以自己解決,也可以採用第三方的java客戶端來完成。這裡就介紹一種常用的客戶端zkclient,目前已經運用到了很多專案中,知名的有Dubbo、Kafka、Helix。

 

ZKClient的設計

 

 

 

 

ZkClient的元件說明

從上述結構上看,IZKConnection是一個ZkClient與ZooKeeper之間的一個介面卡。在程式碼裡直接使用的是ZKClient,其實質還是委託了zookeeper來處理了。

       前面有一篇文章中,已經說了,使用ZooKeeper客戶端來註冊watcher有幾種方法:1、建立ZooKeeper物件時指定預設的Watcher,2、getData(),3、exists(),4、getchildren。其中getdata,exists註冊的是某個節點的事件處理器(watcher),getchildren註冊的是子節點的事件處理器(watcher)。而在ZKClient中,根據事件型別,分為了節點事件(資料事件)、子節點事件。對應的事件處理器則是IZKDataListener和IZKChildListener。另外加入了Session相關的事件和事件處理器。

       ZkEventThread是專門用來處理事件的執行緒。

重要處理流程說明

 

啟動ZKClient

在建立ZKClient物件時,就完成了到ZooKeeper伺服器連線的建立。具體過程是這樣的:

      

 

1、  啟動時,指定好connection string,連線超時時間,序列化工具等。

2、  建立並啟動eventThread,用於接收事件,並排程事件監聽器Listener的執行。

3、  連線到zookeeper伺服器,同時將ZKClient自身作為預設的Watcher。

 

為節點註冊Watcher

       ZooKeeper的三個方法:getData、getChildren、exists,ZKClient都提供了相應的代理方法。就拿exists來看:

 

可以看到,是否註冊watcher,由hasListeners(path)來決定的。

 

hasListeners就是看有沒有與該資料節點繫結的listener。

 

所以呢,預設情況下,都會自動的為指定的path註冊watcher,並且是預設的watcher(ZKClient)。怎麼才能讓hasListeners判定值為true呢,也就是怎麼才能為path繫結Listener呢?

ZKClient提供了訂閱功能:

 

 

一個新建的會話,只需要在取得響應的資料節點後,呼叫subscribteXxx就可以訂閱上相應的事件了。

 

 

ZooKeeper的變更操作

Zookeeper中提供的變更操作有:節點的建立、刪除,節點資料的修改。

 

建立操作,資料節點分為四種,ZKClient分別為他們提供了相應的代理:

 

刪除節點的操作:

 

 

修改節點資料的操作:

 

 

writeDataReturnStat():寫資料並返回資料的狀態。

updateDataSerialized():修改已序列化的資料。執行過程是:先讀取資料,然後使用DataUpdater對資料修改,最後呼叫writeData將修改後的資料傳送給服務端。

 

客戶端處理變更

       前面已經知道,ZKClient是預設的Watcher,並且在為各個資料節點註冊的Watcher都是這個預設的Watcher。那麼該是如何將各種事件通知給相應的Listener呢?

 

處理過程大致可以概括為下面的步驟:

1、判斷變更型別:變更型別分為State變更、ChildNode變更(建立子節點、刪除子節點、修改子節點資料)、NodeData變更(建立指定node,刪除節點,節點資料變更)。

 

2、取出與path關聯的Listeners,併為每一個Listener建立一個ZKEvent,將ZkEvent交給ZkEventThread處理。

3、ZkEventThread執行緒,拿到ZkEvent後,只需要呼叫ZkEvent的run方法進行處理。

 

從這裡也可以知道,具體的怎麼如何呼叫Listener,還要依賴於ZkEvent的run()實現了。

 

序列化處理

ZooKeeper中,會涉及到序列化、反序列化的操作有兩種:getData、setData。在ZKClient中,分別用readData、writeData來替代了。

對於readData:先呼叫zookeeper的getData,然後進行使用ZKSerializer進行反序列化工作。

對於writeData:先使用ZKSerializer將物件序列化後,再呼叫zookeeper的setData。

 

ZkClient如何解決使用ZooKeeper客戶端遇到的問題的呢?

 

Watcher自動重註冊:這個要是依賴於hasListeners()的判斷,來決定是否再次註冊。如果對此有不清晰的,可以看上面的流程處理的說明

       Session失效重連:如果發現會話過期,就先關閉已有連線,再重新建立連線。

       異常處理:對比ZooKeeper和ZKClient,就可以發現ZooKeeper的所有操作都是拋異常的,而ZKClient的所有操作,都不會拋異常的。在發生異常時,它或做日誌,或返回空,或做相應的Listener呼叫。

 

 

相比於ZooKeeper官方客戶端,使用ZKClient時,只需要關注實際的Listener實現即可。所以這個客戶端,還是推薦大家使用的。

另外,是關於zkclient的一些介面,我們可以通過這些介面直接呼叫,使其完成一些相應的任務。

 

1.建立會話

 

[java] view plain copy

  1. public class createSession {  
  2.   
  3.     public static void main(String[] args) {  
  4.         //zk叢集的地址  
  5.         String ZKServers = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";  
  6.       
  7.         /** 
  8.          * 建立會話 
  9.          * new SerializableSerializer() 建立序列化器介面,用來序列化和反序列化 
  10.          */  
  11.         ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());  
  12.           
  13.         System.out.println("conneted ok!");  
  14.           
  15.     }  
  16. }  


2.建立節點

 

 

[java] view plain copy

  1. public class createNode {  
  2.   
  3.     public static void main(String[] args) {  
  4.         //zk叢集的地址  
  5.         String ZKServers = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";  
  6.         ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());  
  7.           
  8.         System.out.println("conneted ok!");  
  9.           
  10.         User user = new User();  
  11.         user.setId(1);  
  12.         user.setName("testUser");  
  13.           
  14.         /** 
  15.          * "/testUserNode" :節點的地址 
  16.          * user:資料的物件 
  17.          * CreateMode.PERSISTENT:建立的節點型別 
  18.          */  
  19.         String path = zkClient.create("/testUserNode", user, CreateMode.PERSISTENT);  
  20.         //輸出建立節點的路徑  
  21.         System.out.println("created path:"+path);  
  22.     }  
  23. }  

 

 

[java] view plain copy

  1. //注意:一定要實現序列化介面  implements Serializable  
  2. public class User implements Serializable{  
  3.       
  4.     private Integer id;  
  5.     private String name;  
  6.       
  7.     public Integer getId() {  
  8.         return id;  
  9.     }  
  10.     public void setId(Integer id) {  
  11.         this.id = id;  
  12.     }  
  13.     public String getName() {  
  14.         return name;  
  15.     }  
  16.     public void setName(String name) {  
  17.         this.name = name;  
  18.     }  
  19. }  


3.獲取節點中的資料

 

 

[java] view plain copy

  1. public class getData {  
  2.     public static void main(String[] args) {  
  3.                 //zk叢集的地址  
  4.                 String ZKServers = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";  
  5.                 ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());  
  6.                 System.out.println("conneted ok!");  
  7.                   
  8.                 Stat stat = new Stat();  
  9.                 //獲取 節點中的物件  
  10.                 User  user = zkClient.readData("/testUserNode",stat);  
  11.                 System.out.println(user.getName());  
  12.                 System.out.println(stat);  
  13.     }  
  14. }  


4.判斷節點是否存在

 

 

[java] view plain copy

  1. public class getData {  
  2.     public static void main(String[] args) {  
  3.                 //zk叢集的地址  
  4.                 String ZKServers = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";  
  5.                 ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());  
  6.                 System.out.println("conneted ok!");  
  7.                   
  8.                 boolean e = zkClient.exists("/testUserNode");  
  9.                 //返回 true表示節點存在 ,false表示不存在  
  10.                 System.out.println(e);  
  11.     }  
  12. }  


5.刪除節點

 

 

[java] view plain copy

  1. public class getData {  
  2.     public static void main(String[] args) {  
  3.                 //zk叢集的地址  
  4.                 String ZKServers = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";  
  5.                 ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());  
  6.                 System.out.println("conneted ok!");  
  7.                   
  8.                 //刪除單獨一個節點,返回true表示成功  
  9.                 boolean e1 = zkClient.delete("/testUserNode");  
  10.                 //刪除含有子節點的節點  
  11.                 boolean e2 = zkClient.deleteRecursive("/test");  
  12.                   
  13.                 //返回 true表示節點成功 ,false表示刪除失敗  
  14.                 System.out.println(e1);  
  15.     }  
  16. }  

 

6.更新資料

 

[java] view plain copy

  1. public static void main(String[] args) {  
  2.                 //zk叢集的地址  
  3.                 String ZKServers = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";  
  4.                 ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());  
  5.                 System.out.println("conneted ok!");  
  6.                   
  7.                 User user = new User();  
  8.                 user.setId(2);  
  9.                 user.setName("testUser2");  
  10.                 /** 
  11.                  * testUserNode 節點的路徑 
  12.                  * user 傳入的資料物件 
  13.                  */  
  14.                 zkClient.writeData("/testUserNode", user);  
  15.     }  


7.訂閱節點的資訊改變(建立節點,刪除節點,新增子節點)

 

 

[java] view plain copy

  1. public class SubscribeChildChanges {  
  2.     private static class ZKChildListener implements IZkChildListener{  
  3.         /** 
  4.          * handleChildChange: 用來處理伺服器端傳送過來的通知 
  5.          * parentPath:對應的父節點的路徑 
  6.          * currentChilds:子節點的相對路徑 
  7.          */  
  8.         public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {  
  9.               
  10.             System.out.println(parentPath);  
  11.             System.out.println(currentChilds.toString());  
  12.               
  13.         }  
  14.           
  15.     }  
  16.       
  17.     public static void main(String[] args) throws InterruptedException {  
  18.         //zk叢集的地址  
  19.         String ZKServers = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";  
  20.         ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());  
  21.         System.out.println("conneted ok!");  
  22.         /** 
  23.          * "/testUserNode" 監聽的節點,可以是現在存在的也可以是不存在的 
  24.          */  
  25.         zkClient.subscribeChildChanges("/testUserNode3", new ZKChildListener());  
  26.         Thread.sleep(Integer.MAX_VALUE);  
  27.     }  
  28. }  


8.訂閱節點的資料內容的變化

 

 

 

[java] view plain copy

  1. public class SubscribeDataChanges {  
  2.     private static class ZKDataListener implements IZkDataListener{  
  3.   
  4.         public void handleDataChange(String dataPath, Object data) throws Exception {  
  5.               
  6.             System.out.println(dataPath+":"+data.toString());  
  7.         }  
  8.   
  9.         public void handleDataDeleted(String dataPath) throws Exception {  
  10.               
  11.             System.out.println(dataPath);  
  12.               
  13.         }  
  14.          
  15.           
  16.     }  
  17.       
  18.     public static void main(String[] args) throws InterruptedException {  
  19.         //zk叢集的地址  
  20.         String ZKServers = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";  
  21.         ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());  
  22.         System.out.println("conneted ok!");  
  23.   
  24.         zkClient.subscribeDataChanges("/testUserNode", new ZKDataListener());  
  25.         Thread.sleep(Integer.MAX_VALUE);  
  26.           
  27.     }  
  28. }