1. 程式人生 > >Apache Curator客戶端

Apache Curator客戶端

一:Apache Curator簡介

1. Curator主要從以下幾個方面降低了zk使用的複雜性

  1. 重試機制:提供可插拔的重試機制, 它將給捕獲所有可恢復的異常配置一個重試策略,並且內部也提供了幾種標準的重試策略(比如指數補償)

  2. 連線狀態監控: Curator初始化之後會一直的對zk連線進行監聽, 一旦發現連線狀態發生變化, 將作出相應的處理

  3. zk客戶端例項管理:Curator對zk客戶端到server叢集連線進行管理.並在需要的情況, 重建zk例項,保證與zk叢集的可靠連線

  4. 各種使用場景支援:Curator實現zk支援的大部分使用場景支援(甚至包括zk自身不支援的場景),這些實現都遵循了zk的最佳實踐,並考慮了各種極端情況

2. Curator主要解決了三類問題

  1. 封裝ZooKeeper client與ZooKeeper server之間的連線處理
  2. 提供了一套Fluent風格的操作API
  3. 提供ZooKeeper各種應用場景(recipe, 比如共享鎖服務, 叢集領導選舉機制)的抽象封裝

3. Curator聲稱的一些亮點

  1. 日誌工具

    • 內部採用SLF4J 來輸出日誌
    • 採用驅動器(driver)機制, 允許擴充套件和定製日誌和跟蹤處理
    • 提供了一個TracerDriver介面, 通過實現addTrace()和addCount()介面來整合使用者自己的跟蹤框架
  2. 和Curator相比, 另一個ZooKeeper客戶端——zkClient的不足之處
    • 文件幾乎沒有
    • 異常處理弱爆了(簡單的丟擲RuntimeException)
    • 重試處理太難用了
    • 沒有提供各種使用場景的實現
  3. 對ZooKeeper自帶客戶端(ZooKeeper類)的”抱怨”
    • 只是一個底層實現
    • 要用需要自己寫大量的程式碼
    • 很容易誤用
    • 需要自己處理連線丟失, 重試等

4. Curator幾個組成部分

  1. Client: 是ZooKeeper客戶端的一個替代品, 提供了一些底層處理和相關的工具方法
  2. Framework: 用來簡化ZooKeeper高階功能的使用, 並增加了一些新的功能, 比如管理到ZooKeeper叢集的連線, 重試處理
  3. Recipes: 實現了通用ZooKeeper的recipe, 該元件建立在Framework的基礎之上
  4. Utilities:各種ZooKeeper的工具類
  5. Errors: 異常處理, 連線, 恢復等
  6. Extensions: recipe擴充套件

二:基本示例

1. 基本操作

public class CuratorTest {
    static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    static final int sessionTimeoutMs = 5000;
    static int connectionTimeoutMs = 3000;

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // namespace: 名稱空間,即根節點,當多個應用使用同一個zk時能夠避免衝突, 操作時不需要顯式使用該根名稱空間(根節點)
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(retryPolicy)
                .namespace("myapp")
                .build();

        // 連線
        client.start();


        String nodePath = "/node1";
        Stat stat = client.checkExists().forPath(nodePath);
        if (stat != null) {
            // 只能刪除葉子節點, version=-1 表示不需要校驗版本,如果版本不對會丟擲異常(BadVersion)
            // version版本相同才會更新
            client.setData().withVersion(-1).forPath(nodePath, "node1 new value".getBytes());

            byte[] bytes = client.getData().forPath(nodePath);
            System.out.println("node1=" + new String(bytes));

            // 讀取節點資料,獲取該節點的Stat
            Stat stat1 = new Stat();
            client.getData().storingStatIn(stat1).forPath(nodePath);
            System.out.println("stat1=" + stat1);

            client.delete().withVersion(-1).forPath(nodePath);
        } else {
            client.create().forPath(nodePath, "node1 default value".getBytes());
        }

        // creatingParentContainersIfNeeded遞迴建立節點
        String result = client.create()
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath("/node1/node11/node111", "node111 init value".getBytes());
        System.out.println(result);
        client.setData().forPath("/node1/node11/node111", "node111 new value".getBytes());



        client.create().withMode(CreateMode.PERSISTENT)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .inBackground()
                .forPath(nodePath + "/node2", "node2 init value".getBytes());

       client.getChildren().forPath(nodePath).forEach(node -> {
            try {
                String fullPath = nodePath + "/" + node;
                System.out.println(node + " = " + new String(client.getData().forPath(fullPath)));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });


        // 刪除當前節點和子節點
        // guaranteed是一個保障措施只要客戶端會話有效那麼Curator會在後臺持續進行刪除操作,直到刪除節點成功。
        client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/node1");
        Thread.sleep(1000);

        // 不存在則建立,存在則更新
        client.create().orSetData().forPath("/node3", "node3 new value".getBytes());

        client.close();
    }
}

2. 事務操作

public class CuratorTest {
    static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    static final int sessionTimeoutMs = 5000;
    static int connectionTimeoutMs = 3000;

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // namespace: 根節點,操作時不需要顯式使用該根名稱空間(根節點)
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(retryPolicy)
                .namespace("myapp")
                .build();

        // 連線
        client.start();

        // 定義事務操作
        CuratorOp createCuratorOp = client.transactionOp().create().forPath("/node1", "node1 init value".getBytes());
        CuratorOp setCuratorOp = client.transactionOp().setData().forPath("/node1", "node1 new value".getBytes());
        CuratorOp delCuratorOp = client.transactionOp().delete().forPath("/node1");

        // 返回值:為每個操作的結果
        List<CuratorTransactionResult> results = client.transaction().forOperations(createCuratorOp, setCuratorOp, delCuratorOp);
        for (CuratorTransactionResult result: results) {
            System.out.println("執行結果:" + result.getForPath() + "\t" + result.getType() + "\t" + result.getError() + "\t" + result.getResultStat());
        }

        client.close();
    }
}

3.監聽

方式一:全域性監聽

// 只有inBackground()才能被監聽到
CuratorListener listener = new CuratorListener() {
    @Override
    public void eventReceived(CuratorFramework client, CuratorEvent event) {
        System.out.println(event.getType() + "\t" + event.getPath());
    }
};
client.getCuratorListenable().addListener(listener);

// inBackground 後臺執行,即非同步執行
client.create().inBackground().forPath("/node1", "node1 value".getBytes());
client.setData().inBackground().forPath("/node1", "node1 new value".getBytes());
client.delete().inBackground().forPath("/node1");

//        WATCHED   null
//        CREATE    /node1
//        SET_DATA  /node1
//        DELETE    /node1
//        CLOSING   null

方式二:監聽單個操作

BackgroundCallback callback = new BackgroundCallback() {
    @Override
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
        System.out.println(event.getType() + "\t" + event.getPath());
    }
};

// inBackground 後臺執行,即非同步執行
client.create().inBackground(callback).forPath("/node1", "node1 value".getBytes());
client.setData().inBackground(callback).forPath("/node1", "node1 new value".getBytes());
client.delete().inBackground(callback).forPath("/node1");

//        CREATE        /node1
//        SET_DATA      /node1
//        DELETE        /node1

方式三:TreeCache

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

NodeCache 好像只能監控刪除節點和修改節點,沒有監控建立節點

final NodeCache cache = new NodeCache(client, "/node1");
NodeCacheListener listener = () -> {
    ChildData data = cache.getCurrentData();
    if (null != data) {
        System.out.println("路徑=" + data.getPath() + "\t data=" + new String(data.getData()));
    } else {
        System.out.println("節點被刪除!");
    }
};
cache.getListenable().addListener(listener);
cache.start();

client.create().forPath("/node1", "node1 value".getBytes());
client.setData().forPath("/node1", "node1 new value".getBytes());
client.delete().forPath("/node1");

cache.close();

// 路徑=/node1     data=node1 new value
// 節點被刪除!

PathChildrenCache 好像不能監控修改節點

PathChildrenCache cache = new PathChildrenCache(client, "/node1", true);
cache.start();
PathChildrenCacheListener cacheListener = (client1, event) -> {
    System.out.println("事件型別:" + event.getType());
    if (null != event.getData()) {
        System.out.println("節點資料:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
    }
};
cache.getListenable().addListener(cacheListener);

client.create().creatingParentsIfNeeded().forPath("/node1/node11", "node1 value".getBytes());
client.setData().forPath("/node1/node11", "node1 new value".getBytes());
client.delete().deletingChildrenIfNeeded().forPath("/node1");

cache.close();

//        事件型別:CONNECTION_RECONNECTED
//        事件型別:CHILD_ADDED
//        節點資料:/node1/node11 = node1 new value
//        事件型別:CHILD_REMOVED
//        節點資料:/node1/node11 = node1 new value

TreeCache = PathCache + NodeCache 建立、修改、刪除都能監控到

TreeCacheListener cacheListener = (client1, event) -> {
    System.out.println("事件型別:" + event.getType() +
            "\t路徑:" + (null != event.getData() ? event.getData().getPath() : null));
};
TreeCache cache = new TreeCache(client, "/node1");
cache.start();
cache.getListenable().addListener(cacheListener);

client.create().creatingParentsIfNeeded().forPath("/node1/node11", "node1 value".getBytes());
client.setData().forPath("/node1/node11", "node1 new value".getBytes());
client.delete().deletingChildrenIfNeeded().forPath("/node1");

Thread.sleep(1000);
cache.close();

//        事件型別:INITIALIZED      路徑:null
//        事件型別:NODE_ADDED       路徑:/node1
//        事件型別:NODE_ADDED       路徑:/node1/node11
//        事件型別:NODE_UPDATED     路徑:/node1/node11
//        事件型別:NODE_REMOVED     路徑:/node1/node11
//        事件型別:NODE_REMOVED     路徑:/node1

ConnectionStateListener
監聽連線狀態,當連線丟失時去重新連線

public class CuratorTest {
    static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    static final int sessionTimeoutMs = 5000;
    static int connectionTimeoutMs = 3000;
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    static CuratorFramework client;
    static ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (newState == ConnectionState.CONNECTED) {
                System.out.println("連線成功");
                countDownLatch.countDown();
            } else if (newState == ConnectionState.LOST) {
                System.out.println("連線丟失");

                try {
                    System.out.println("重新初始化開始");
                    reInitClient();
                    System.out.println("重新初始化完畢");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    };

    public static void main(String[] args) throws Exception {
        initClient();
        client.close();
    }

    public static void initClient() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // namespace: 根節點,操作時不需要顯式使用該根名稱空間(根節點)
        client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(retryPolicy)
                .namespace("myapp")
                .build();

        // 連線
        client.start();
        client.getConnectionStateListenable().addListener(connectionStateListener);

        countDownLatch.await();
    }


    public static void reInitClient() throws Exception {
        // 先關閉client
        if (client != null) {
            client.close();
            client = null;
        }

        // 然後再初始化客戶端
        initClient();
    }
}

CuratorWatcher 好像只能監聽到setData

CuratorWatcher pathWatcher = new CuratorWatcher() {
    @Override
    public void process(WatchedEvent event) throws Exception {
        if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
            String path = event.getPath();
            String value = new String(client.getData().forPath(path));
            System.out.println(path + "=" +value);
        }

        System.out.println(event);
    }
};

String path = "/node1";
client.create().forPath(path, "node1 value".getBytes());

String value = new String(client.getData().usingWatcher(pathWatcher).forPath(path));
System.out.println(path + "=" + value);

client.create().forPath(path + "/node11", "node1 value".getBytes());
client.setData().forPath(path, "node1 new value".getBytes());
client.delete().deletingChildrenIfNeeded().forPath(path);

Thread.sleep(5000);

// node1=node1 value
// node1=node1 new value
// WatchedEvent state:SyncConnected type:NodeDataChanged path:/node1