1. 程式人生 > >zookeeper學習之三(Curator客戶端)

zookeeper學習之三(Curator客戶端)

Curator框架是最好用,最流行的zookeeper的客戶端。

它有以下三個優點

1.提供了一套非常友好的操作API;

2. 提供一些高階特性(包括但不僅限於前篇文章中提到的)的封裝

3.易測試

maven依賴如下

Xml程式碼  收藏程式碼
  1. <dependency>  
  2.     <groupId>org.apache.curator</groupId>  
  3.     <artifactId>curator-recipes</artifactId>  
  4.     <version>2.5.0</version>  
  5. </dependency>
      

按照官方給出的文件和包結構,可以輕鬆的看出Curator功能分兩大類,一是對zookeeper的一些基本命令的封裝,比如增刪改查。是他的framework模組,一個是他的高階特性,即recipes模組。

一、framework模組

Curator提供了一套Fluent風格的操作API。這在很多指令碼類語言裡比較流行。

比如他建立client的程式碼是這樣

Java程式碼  收藏程式碼
  1. CuratorFramework client = builder.connectString("192.168.11.56:2180")  
  2.         .sessionTimeoutMs(30000)  
  3.         .connectionTimeoutMs(30000
    )  
  4.         .canBeReadOnly(false)  
  5.         .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))  
  6.         .namespace(namespace)  
  7.         .defaultData(null)  
  8.         .build();  
  9. client.start();  

 一路點到底,這就是所謂的Fluent風格。 

我們再看增刪改查的

Java程式碼  收藏程式碼
  1. public class CrudExamples {  
  2.     private static CuratorFramework client = ClientFactory.newClient();  
  3.     private static final String PATH = "/crud";  
  4.     public static void main(String[] args) {  
  5.         try {  
  6.             client.start();  
  7.             client.create().forPath(PATH, "I love messi".getBytes());  
  8.             byte[] bs = client.getData().forPath(PATH);  
  9.             System.out.println("新建的節點,data為:" + new String(bs));  
  10.             client.setData().forPath(PATH, "I love football".getBytes());  
  11.             // 由於是在background模式下獲取的data,此時的bs可能為null  
  12.             byte[] bs2 = client.getData().watched().inBackground().forPath(PATH);  
  13.             System.out.println("修改後的data為" + new String(bs2 != null ? bs2 : new byte[0]));  
  14.             client.delete().forPath(PATH);  
  15.             Stat stat = client.checkExists().forPath(PATH);  
  16.             // Stat就是對zonde所有屬性的一個對映, stat=null表示節點不存在!  
  17.             System.out.println(stat);  
  18.         } catch (Exception e) {  
  19.             e.printStackTrace();  
  20.         } finally {  
  21.             CloseableUtils.closeQuietly(client);  
  22.         }  
  23.     }  
  24. }  

 常用介面有

create()增

delete(): 刪

checkExists(): 判斷是否存在

setData():  改

getData(): 查

所有這些方法都以forpath()結尾,輔以watch(監聽),withMode(指定模式),和inBackground(後臺執行)等方法來使用。

 此外,Curator還支援事務,一組crud操作同生同滅。程式碼如下

Java程式碼  收藏程式碼
  1. /** 
  2.  * 事務操作 
  3.  *  
  4.  * @author shencl 
  5.  */  
  6. public class TransactionExamples {  
  7.     private static CuratorFramework client = ClientFactory.newClient();  
  8.     public static void main(String[] args) {  
  9.         try {  
  10.             client.start();  
  11.             // 開啟事務  
  12.             CuratorTransaction transaction = client.inTransaction();  
  13.             Collection<CuratorTransactionResult> results = transaction.create()  
  14.                     .forPath("/a/path""some data".getBytes()).and().setData()  
  15.                     .forPath("/another/path""other data".getBytes()).and().delete().forPath("/yet/another/path")  
  16.                     .and().commit();  
  17.             for (CuratorTransactionResult result : results) {  
  18.                 System.out.println(result.getForPath() + " - " + result.getType());  
  19.             }  
  20.         } catch (Exception e) {  
  21.             e.printStackTrace();  
  22.         } finally {  
  23.             // 釋放客戶端連線  
  24.             CloseableUtils.closeQuietly(client);  
  25.         }  
  26.     }  
  27. }  

 這段的程式碼的執行結果,由於最後一步delete的節點不存在,所以整個事務commit失敗。失敗的原因會放在Collection<CuratorTransactionResult>中,非常友好。

好了framework部分的內容就這麼多,是不是特別簡單呢。下面就來看看recipes包的內容吧。。

Recipes部分提供的功能官網列的很詳細,點選這裡。注意文章第一段:Curator宣稱,Recipes模組實現了除二階段提交之外的所有zookeeper特性。

二、Recipes模組

主要有

Elections(選舉),Locks(鎖),Barriers(關卡),Atomic(原子量),Caches,Queues等

1、 Elections

選舉主要依賴於LeaderSelector和LeaderLatch2個類。前者是所有存活的客戶端不間斷的輪流做Leader,大同社會。後者是一旦選舉出Leader,除非有客戶端掛掉重新觸發選舉,否則不會交出領導權。某黨?

這兩者在實現上是可以切換的,直接上程式碼,怎麼切換註釋裡有。由於篇幅所限,這裡僅貼出基於LeaderSelector的選舉,更多程式碼見附件

Java程式碼  收藏程式碼
  1. /** 
  2.  * 本類基於leaderSelector實現,所有存活的client會公平的輪流做leader 
  3.  * 如果不想頻繁的變化Leader,需要在takeLeadership方法裡阻塞leader的變更! 或者使用 {@link} 
  4.  * LeaderLatchClient 
  5.  */  
  6. public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {  
  7.     private final String name;  
  8.     private final LeaderSelector leaderSelector;  
  9.     private final String PATH = "/leaderselector";  
  10.     public LeaderSelectorClient(CuratorFramework client, String name) {  
  11.         this.name = name;  
  12.         leaderSelector = new LeaderSelector(client, PATH, this);  
  13.         leaderSelector.autoRequeue();  
  14.     }  
  15.     public void start() throws IOException {  
  16.         leaderSelector.start();  
  17.     }  
  18.     @Override  
  19.     public void close() throws IOException {  
  20.         leaderSelector.close();  
  21.     }  
  22.     /** 
  23.      * client成為leader後,會呼叫此方法 
  24.      */  
  25.     @Override  
  26.     public void takeLeadership(CuratorFramework client) throws Exception {  
  27.         int waitSeconds = (int) (5 * Math.random()) + 1;  
  28.         System.out.println(name + "是當前的leader");  
  29.         try {  
  30.             Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));  
  31.         } catch (InterruptedException e) {  
  32.             Thread.currentThread().interrupt();  
  33.         } finally {  
  34.             System.out.println(name + " 讓出領導權\n");  
  35.         }  
  36.     }  
Java程式碼  收藏程式碼
  1. /** 
  2.  * leader選舉 
  3.  *  
  4.  * @author shencl 
  5.  */  
  6. public class LeaderSelectorExample {  
  7.     public static void main(String[] args) {  
  8.         List<CuratorFramework> clients = Lists.newArrayList();  
  9.         List<LeaderSelectorClient> examples = Lists.newArrayList();  
  10.         try {  
  11.             for (int i = 0; i < 10; i++) {  
  12.                 CuratorFramework client = ClientFactory.newClient();  
  13.                 LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i);  
  14.                 clients.add(client);  
  15.                 examples.add(example);  
  16.                 client.start();  
  17.                 example.start();  
  18.             }  
  19.             System.out.println("----------先觀察一會選舉的結果-----------");  
  20.             Thread.sleep(10000);  
  21.             System.out.println("----------關閉前5個客戶端,再觀察選舉的結果-----------");  
  22.             for (int i = 0; i < 5; i++) {  
  23.                 clients.get(i).close();  
  24.             }  
  25.             // 這裡有個小技巧,讓main程式一直監聽控制檯輸入,非同步的程式碼就可以一直在執行。不同於while(ture)的是,按回車或esc可退出  
  26.             new BufferedReader(new InputStreamReader(System.in)).readLine();  
  27.         } catch (Exception e) {  
  28.             e.printStackTrace();  
  29.         } finally {  
  30.             for (LeaderSelectorClient exampleClient : examples) {  
  31.                 CloseableUtils.closeQuietly(exampleClient);  
  32.             }  
  33.             for (CuratorFramework client : clients) {  
  34.                 CloseableUtils.closeQuietly(client);  
  35.             }  
  36.         }  
  37.     }  
  38. }  

2、locks

curator lock相關的實現在recipes.locks包裡。頂級介面都是InterProcessLock。我們直接看最有代表性的InterProcessReadWriteLock 程序內部讀寫鎖(可重入讀寫鎖)。什麼叫可重入,什麼叫讀寫鎖。不清楚的先查好資料吧。總之讀寫鎖一定是成對出現的。    簡易傳送門

我們先定義兩個任務,可並行的執行的,和互斥執行的。

Java程式碼  收藏程式碼
  1. /** 
  2.  * 並行任務 
  3.  *  
  4.  * @author shencl 
  5.  */  
  6. public class ParallelJob implements Runnable {  
  7.     private final String name;  
  8.     private final InterProcessLock lo