zookeeper學習之三(Curator客戶端)
Curator框架是最好用,最流行的zookeeper的客戶端。
它有以下三個優點
1.提供了一套非常友好的操作API;
2. 提供一些高階特性(包括但不僅限於前篇文章中提到的)的封裝
3.易測試
maven依賴如下
Xml程式碼- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.5.0</version>
- </dependency>
按照官方給出的文件和包結構,可以輕鬆的看出Curator功能分兩大類,一是對zookeeper的一些基本命令的封裝,比如增刪改查。是他的framework模組,一個是他的高階特性,即recipes模組。
一、framework模組
Curator提供了一套Fluent風格的操作API。這在很多指令碼類語言裡比較流行。
比如他建立client的程式碼是這樣
Java程式碼- CuratorFramework client = builder.connectString("192.168.11.56:2180")
- .sessionTimeoutMs(30000)
- .connectionTimeoutMs(30000
- .canBeReadOnly(false)
- .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
- .namespace(namespace)
- .defaultData(null)
- .build();
- client.start();
一路點到底,這就是所謂的Fluent風格。
我們再看增刪改查的
Java程式碼- public class CrudExamples {
- private static CuratorFramework client = ClientFactory.newClient();
- private static final String PATH = "/crud";
- public static void main(String[] args) {
- try {
- client.start();
- client.create().forPath(PATH, "I love messi".getBytes());
- byte[] bs = client.getData().forPath(PATH);
- System.out.println("新建的節點,data為:" + new String(bs));
- client.setData().forPath(PATH, "I love football".getBytes());
- // 由於是在background模式下獲取的data,此時的bs可能為null
- byte[] bs2 = client.getData().watched().inBackground().forPath(PATH);
- System.out.println("修改後的data為" + new String(bs2 != null ? bs2 : new byte[0]));
- client.delete().forPath(PATH);
- Stat stat = client.checkExists().forPath(PATH);
- // Stat就是對zonde所有屬性的一個對映, stat=null表示節點不存在!
- System.out.println(stat);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- CloseableUtils.closeQuietly(client);
- }
- }
- }
常用介面有
create()增
delete(): 刪
checkExists(): 判斷是否存在
setData(): 改
getData(): 查
所有這些方法都以forpath()結尾,輔以watch(監聽),withMode(指定模式),和inBackground(後臺執行)等方法來使用。
此外,Curator還支援事務,一組crud操作同生同滅。程式碼如下
Java程式碼- /**
- * 事務操作
- *
- * @author shencl
- */
- public class TransactionExamples {
- private static CuratorFramework client = ClientFactory.newClient();
- public static void main(String[] args) {
- try {
- client.start();
- // 開啟事務
- CuratorTransaction transaction = client.inTransaction();
- Collection<CuratorTransactionResult> results = transaction.create()
- .forPath("/a/path", "some data".getBytes()).and().setData()
- .forPath("/another/path", "other data".getBytes()).and().delete().forPath("/yet/another/path")
- .and().commit();
- for (CuratorTransactionResult result : results) {
- System.out.println(result.getForPath() + " - " + result.getType());
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- // 釋放客戶端連線
- CloseableUtils.closeQuietly(client);
- }
- }
- }
這段的程式碼的執行結果,由於最後一步delete的節點不存在,所以整個事務commit失敗。失敗的原因會放在Collection<CuratorTransactionResult>中,非常友好。
好了framework部分的內容就這麼多,是不是特別簡單呢。下面就來看看recipes包的內容吧。。
Recipes部分提供的功能官網列的很詳細,點選這裡。注意文章第一段:Curator宣稱,Recipes模組實現了除二階段提交之外的所有zookeeper特性。
二、Recipes模組
主要有
Elections(選舉),Locks(鎖),Barriers(關卡),Atomic(原子量),Caches,Queues等
1、 Elections
選舉主要依賴於LeaderSelector和LeaderLatch2個類。前者是所有存活的客戶端不間斷的輪流做Leader,大同社會。後者是一旦選舉出Leader,除非有客戶端掛掉重新觸發選舉,否則不會交出領導權。某黨?
這兩者在實現上是可以切換的,直接上程式碼,怎麼切換註釋裡有。由於篇幅所限,這裡僅貼出基於LeaderSelector的選舉,更多程式碼見附件
Java程式碼- /**
- * 本類基於leaderSelector實現,所有存活的client會公平的輪流做leader
- * 如果不想頻繁的變化Leader,需要在takeLeadership方法裡阻塞leader的變更! 或者使用 {@link}
- * LeaderLatchClient
- */
- public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {
- private final String name;
- private final LeaderSelector leaderSelector;
- private final String PATH = "/leaderselector";
- public LeaderSelectorClient(CuratorFramework client, String name) {
- this.name = name;
- leaderSelector = new LeaderSelector(client, PATH, this);
- leaderSelector.autoRequeue();
- }
- public void start() throws IOException {
- leaderSelector.start();
- }
- @Override
- public void close() throws IOException {
- leaderSelector.close();
- }
- /**
- * client成為leader後,會呼叫此方法
- */
- @Override
- public void takeLeadership(CuratorFramework client) throws Exception {
- int waitSeconds = (int) (5 * Math.random()) + 1;
- System.out.println(name + "是當前的leader");
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- System.out.println(name + " 讓出領導權\n");
- }
- }
- /**
- * leader選舉
- *
- * @author shencl
- */
- public class LeaderSelectorExample {
- public static void main(String[] args) {
- List<CuratorFramework> clients = Lists.newArrayList();
- List<LeaderSelectorClient> examples = Lists.newArrayList();
- try {
- for (int i = 0; i < 10; i++) {
- CuratorFramework client = ClientFactory.newClient();
- LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i);
- clients.add(client);
- examples.add(example);
- client.start();
- example.start();
- }
- System.out.println("----------先觀察一會選舉的結果-----------");
- Thread.sleep(10000);
- System.out.println("----------關閉前5個客戶端,再觀察選舉的結果-----------");
- for (int i = 0; i < 5; i++) {
- clients.get(i).close();
- }
- // 這裡有個小技巧,讓main程式一直監聽控制檯輸入,非同步的程式碼就可以一直在執行。不同於while(ture)的是,按回車或esc可退出
- new BufferedReader(new InputStreamReader(System.in)).readLine();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- for (LeaderSelectorClient exampleClient : examples) {
- CloseableUtils.closeQuietly(exampleClient);
- }
- for (CuratorFramework client : clients) {
- CloseableUtils.closeQuietly(client);
- }
- }
- }
- }
2、locks
curator lock相關的實現在recipes.locks包裡。頂級介面都是InterProcessLock。我們直接看最有代表性的InterProcessReadWriteLock 程序內部讀寫鎖(可重入讀寫鎖)。什麼叫可重入,什麼叫讀寫鎖。不清楚的先查好資料吧。總之讀寫鎖一定是成對出現的。 簡易傳送門
我們先定義兩個任務,可並行的執行的,和互斥執行的。
Java程式碼- /**
- * 並行任務
- *
- * @author shencl
- */
- public class ParallelJob implements Runnable {
- private final String name;
- private final InterProcessLock lo