1. 程式人生 > >7.5 zookeeper客戶端curator的基本使用

7.5 zookeeper客戶端curator的基本使用

serve server 超時 one c-c tlist result 強制 car

使用zookeeper原生API實現一些復雜的東西比較麻煩。所以,出現了兩款比較好的開源客戶端,對zookeeper的原生API進行了包裝:zkClient和curator。後者是Netflix出版的,必屬精品,也是最好用的zk的開源客戶端。

一 curator基本API使用

引入依賴:

1         <dependency>
2             <groupId>org.apache.curator</groupId>
3             <artifactId>curator-framework</artifactId
> 4 <version>2.12.0</version> 5 </dependency>

該依賴引入後,默認引入的zookeeper版本是3.4.8。

註意:不要引入>=3.0.0的curator-framework,默認引入的zookeeper版本是3.5.x(該版本還不穩定),目前測試起來還是有點問題的。

完整代碼:

 1 package com.hulk.curator;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import
org.apache.curator.framework.CuratorFrameworkFactory; 5 import org.apache.curator.framework.api.BackgroundCallback; 6 import org.apache.curator.framework.api.CuratorEvent; 7 import org.apache.curator.retry.ExponentialBackoffRetry; 8 import org.apache.zookeeper.CreateMode; 9 import org.apache.zookeeper.data.Stat;
10 11 import java.util.concurrent.Executors; 12 13 public class CuratorTest { 14 private static CuratorFramework client = CuratorFrameworkFactory.builder() 15 .connectString("10.211.55.4:2181") 16 .sessionTimeoutMs(50000) 17 .connectionTimeoutMs(30000) 18 .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); 19 20 public static void main(String[] args) throws Exception { 21 /** 22 * 創建會話 23 */ 24 client.start(); 25 26 /** 27 * 創建節點 28 * 註意: 29 * 1 除非指明創建節點的類型,默認是持久節點 30 * 2 ZooKeeper規定:所有非葉子節點都是持久節點,所以遞歸創建出來的節點,只有最後的數據節點才是指定類型的節點,其父節點是持久節點 31 */ 32 client.create().forPath("/China");//創建一個初始內容為空的節點 33 client.create().forPath("/America", "zhangsan".getBytes()); 34 client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//創建一個初始內容為空的臨時節點 35 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//遞歸創建,/Russia是持久節點 36 37 /** 38 * 異步創建節點 39 * 註意:如果自己指定了線程池,那麽相應的操作就會在線程池中執行,如果沒有指定,那麽就會使用Zookeeper的EventThread線程對事件進行串行處理 40 */ 41 client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { 42 @Override 43 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { 44 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode() 45 + ",type:" + event.getType()); 46 } 47 }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my"); 48 49 client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { 50 @Override 51 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { 52 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode() 53 + ",type:" + event.getType()); 54 } 55 }).forPath("/async-curator-zookeeper"); 56 57 /** 58 * 獲取節點內容 59 */ 60 byte[] data = client.getData().forPath("/America"); 61 System.out.println(new String(data)); 62 byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //傳入一個舊的stat變量,來存儲服務端返回的最新的節點狀態信息 63 System.out.println(new String(data2)); 64 /** 65 * 更新數據 66 */ 67 Stat stat = client.setData().forPath("/America"); 68 client.setData().withVersion(4).forPath("/America", "lisi".getBytes()); 69 70 /** 71 * 刪除節點 72 */ 73 client.delete().forPath("/China");//只能刪除葉子節點 74 client.delete().deletingChildrenIfNeeded().forPath("/Russia");//刪除一個節點,並遞歸刪除其所有子節點 75 client.delete().withVersion(5).forPath("/America");//強制指定版本進行刪除 76 client.delete().guaranteed().forPath("/America");//註意:由於一些網絡原因,上述的刪除操作有可能失敗,使用guaranteed(),如果刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功為止 77 78 Thread.sleep(Integer.MAX_VALUE); 79 } 80 }

1 創建會話

curator創建會話有兩種方式,推薦流式API。

1 CuratorFramework client = CuratorFrameworkFactory.builder()
2             .connectString("10.211.55.4:2181")
3             .sessionTimeoutMs(50000)
4             .connectionTimeoutMs(30000)
5             .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

參數:

  • connectString:zk的server地址,多個server之間使用英文逗號分隔開
  • connectionTimeoutMs:連接超時時間,如上是30s,默認是15s
  • sessionTimeoutMs:會話超時時間,如上是50s,默認是60s
  • retryPolicy:失敗重試策略
    • ExponentialBackoffRetry:構造器含有三個參數 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
      • baseSleepTimeMs:初始的sleep時間,用於計算之後的每次重試的sleep時間,
        • 計算公式:當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
      • maxRetries:最大重試次數
      • maxSleepMs:最大sleep時間,如果上述的當前sleep計算出來比這個大,那麽sleep用這個時間
    • 其他,查看org.apache.curator.RetryPolicy接口的實現類

此時會話還沒創建,使用如下代碼創建會話:

1 client.start();

start()會阻塞到會話創建成功為止。

2 創建節點

2.1 同步創建

1         client.create().forPath("/China");//創建一個初始內容為空的節點
2         client.create().forPath("/America", "zhangsan".getBytes());
3         client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//創建一個初始內容為空的臨時節點
4         client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//遞歸創建,/Russia是持久節點

註意:

  • 除非指明創建節點的類型,默認是持久節點
  • ZooKeeper規定:所有非葉子節點都是持久節點,所以遞歸創建出來的節點,只有最後的數據節點才是指定類型的節點,其父節點是持久節點
  • creatingParentsIfNeeded():可以實現遞歸創建

2.2 異步創建

 1         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
 2             @Override
 3             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
 4                 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
 5                                    + ",type:" + event.getType());
 6             }
 7         }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");
 8 
 9         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
10             @Override
11             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
12                 System.out.println("當前線程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
13                                    + ",type:" + event.getType());
14             }
15         }).forPath("/async-curator-zookeeper");

註意:

  • 在curator中所有異步操作,都使用org.apache.curator.framework.api.BackgroundCallback接口的實現類完成
  • 如果在BackgroundCallback中自己指定了線程池,那麽相應的操作就會在線程池中執行,如果沒有指定,那麽就會使用Zookeeper的EventThread線程對事件進行串行處理,所以上述的兩個輸出分別如下:
    當前線程:pool-3-thread-1,code:0,type:CREATE
    當前線程:main-EventThread,code:0,type:CREATE

3 獲取節點內容

1         byte[] data = client.getData().forPath("/America");
2         System.out.println(new String(data));
3         byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //傳入一個舊的stat變量,來存儲服務端返回的最新的節點狀態信息
4         System.out.println(new String(data2));

4 獲取節點子節點列表

1 List<String> children = client.getChildren().forPath("/Russia");

5 更新數據

1         Stat stat = client.setData().forPath("/America");
2         client.setData().withVersion(4).forPath("/America", "lisi".getBytes());

註意:

  • version版本號還是為了實現CAS並發處理,也會強制某個線程必須更新相應的版本的數據

6 刪除節點

1         client.delete().forPath("/China");//只能刪除葉子節點
2         client.delete().deletingChildrenIfNeeded().forPath("/Russia");//刪除一個節點,並遞歸刪除其所有子節點
3         client.delete().withVersion(5).forPath("/America");//強制指定版本進行刪除
4         client.delete().guaranteed().forPath("/America");

註意:

  • deletingChildrenIfNeeded()實現級聯刪除
  • guaranteed()由於一些網絡原因,上述的刪除操作有可能失敗,使用guaranteed(),如果刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功為止

二 curator實現事件監聽

引入兩個依賴:

 1         <dependency>
 2             <groupId>org.apache.curator</groupId>
 3             <artifactId>curator-framework</artifactId>
 4             <version>2.12.0</version>
 5         </dependency>
 6         <dependency>
 7             <groupId>org.apache.curator</groupId>
 8             <artifactId>curator-recipes</artifactId>
 9             <version>2.12.0</version>
10         </dependency>

給出全部代碼:

 1 package com.hulk.curator;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.CuratorFrameworkFactory;
 5 import org.apache.curator.framework.recipes.cache.NodeCache;
 6 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 7 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 8 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 9 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
10 import org.apache.curator.retry.ExponentialBackoffRetry;
11 
12 /**
13  * 事件監聽器
14  */
15 public class CuratorWatcherTest {
16     private static CuratorFramework client = CuratorFrameworkFactory.builder()
17             .connectString("10.211.55.4:2181")
18             .sessionTimeoutMs(50000)
19             .connectionTimeoutMs(30000)
20             .retryPolicy(new ExponentialBackoffRetry(1000, 3))
21             .build();
22 
23     public static void main(String[] args) throws Exception {
24         /**
25          * 創建會話
26          */
27         client.start();
28         client.create().creatingParentsIfNeeded().forPath("/book/computer","java".getBytes());
29         /**
30          * 監聽指定節點本身的變化,包括節點本身的創建和節點本身數據的變化
31          */
32         NodeCache nodeCache = new NodeCache(client,"/book/computer");
33         nodeCache.getListenable().addListener(new NodeCacheListener() {
34             @Override
35             public void nodeChanged() throws Exception {
36                 System.out.println("新的節點數據:" + new String(nodeCache.getCurrentData().getData()));
37             }
38         });
39         nodeCache.start(true);
40 
41         client.setData().forPath("/book/computer","c++".getBytes());
42         /**
43          * 監聽子節點變化情況
44          * 1 新增子節點
45          * 2 刪除子節點
46          * 3 子節點數據變更
47          */
48         PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/book13",true);
49         pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
50             @Override
51             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
52                 switch (event.getType()){
53                     case CHILD_ADDED:
54                         System.out.println("新增子節點:" + event.getData().getPath());
55                         break;
56                     case CHILD_UPDATED:
57                         System.out.println("子節點數據變化:" + event.getData().getPath());
58                         break;
59                     case CHILD_REMOVED:
60                         System.out.println("刪除子節點:" + event.getData().getPath());
61                         break;
62                     default:
63                         break;
64                 }
65             }
66         });
67         pathChildrenCache.start();
68 
69         client.create().forPath("/book13");
70 
71         client.create().forPath("/book13/car", "bmw".getBytes());
72 
73         client.setData().forPath("/book13/car", "audi".getBytes());
74 
75         client.delete().forPath("/book13/car");
76     }
77 }

curator的事件監聽分為:

  • NodeCache:對節點本身的監聽
    • 監聽節點本身的創建
    • 監聽節點本身的數據的變化
  • PathChildrenCache:對節點的子節點的監聽
    • 監聽新增子節點
    • 監聽刪除子節點
    • 監聽子節點數據變化

註意

  • PathChildrenCache只會監聽指定節點的一級子節點,不會監聽節點本身(例如:“/book13”),也不會監聽子節點的子節點(例如,“/book13/car/color”)

7.5 zookeeper客戶端curator的基本使用