1. 程式人生 > >跟著例項學習ZooKeeper的用法: Curator框架應用

跟著例項學習ZooKeeper的用法: Curator框架應用

前面的幾篇文章介紹了一些ZooKeeper的應用方法, 本文將介紹Curator訪問ZooKeeper的一些基本方法, 而不僅僅限於指定的Recipes, 你可以使用Curator API任意的訪問ZooKeeper。

CuratorFramework

Curator框架提供了一套高階的API, 簡化了ZooKeeper的操作。 它增加了很多使用ZooKeeper開發的特性,可以處理ZooKeeper叢集複雜的連線管理和重試機制。 這些特性包括:

  • 自動化的連線管理: 重新建立到ZooKeeper的連線和重試機制存在一些潛在的錯誤case。 Curator幫助你處理這些事情,對你來說是透明的。
  • 清理API:
    • 簡化了原生的ZooKeeper的方法,事件等
    • 提供了一個現代的流式介面
  • 提供了Recipes實現: 如前面的文章介紹的那樣,基於這些Recipes可以建立很多複雜的分散式應用

Curator框架通過CuratorFrameworkFactory以工廠模式和builder模式建立CuratorFramework實 例。 CuratorFramework例項都是執行緒安全的,你應該在你的應用中共享同一個CuratorFramework例項.

工廠方法newClient()提供了一個簡單方式建立例項。 而Builder提供了更多的引數控制。一旦你建立了一個CuratorFramework例項,你必須呼叫它的start()啟動,在應用退出時呼叫close()方法關閉.

下面的例子演示了兩種建立Curator的方法:

package com.colobu.zkrecipe.framework;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import
org.apache.curator.utils.CloseableUtils; public class CreateClientExample { private static final String PATH = "/example/basic"; public static void main(String[] args) throws Exception { TestingServer server = new TestingServer(); CuratorFramework client = null; try { client = createSimple(server.getConnectString()); client.start(); client.create().creatingParentsIfNeeded().forPath(PATH, "test".getBytes()); CloseableUtils.closeQuietly(client); client = createWithOptions(server.getConnectString(), new ExponentialBackoffRetry(1000, 3), 1000, 1000); client.start(); System.out.println(new String(client.getData().forPath(PATH))); } catch (Exception ex) { ex.printStackTrace(); } finally { CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(server); } } public static CuratorFramework createSimple(String connectionString) { // these are reasonable arguments for the ExponentialBackoffRetry. // The first retry will wait 1 second - the second will wait up to 2 seconds - the // third will wait up to 4 seconds. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // The simplest way to get a CuratorFramework instance. This will use default values. // The only required arguments are the connection string and the retry policy return CuratorFrameworkFactory.newClient(connectionString, retryPolicy); } public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) { // using the CuratorFrameworkFactory.builder() gives fine grained control // over creation options. See the CuratorFrameworkFactory.Builder javadoc details return CuratorFrameworkFactory.builder().connectString(connectionString) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) // etc. etc. .build(); } }

Curator框架提供了一種流式介面。 操作通過builder串聯起來, 這樣方法呼叫類似語句一樣。

client.create().forPath("/head", new byte[0]);
client.delete().inBackground().forPath("/head");
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
client.getData().watched().inBackground().forPath("/test");

CuratorFramework提供的方法:

方法名 描述
create() 開始建立操作, 可以呼叫額外的方法(比如方式mode 或者後臺執行background) 並在最後呼叫forPath()指定要操作的ZNode
delete() 開始刪除操作. 可以呼叫額外的方法(版本或者後臺處理version or background)並在最後呼叫forPath()指定要操作的ZNode
checkExists() 開始檢查ZNode是否存在的操作. 可以呼叫額外的方法(監控或者後臺處理)並在最後呼叫forPath()指定要操作的ZNode
getData() 開始獲得ZNode節點資料的操作. 可以呼叫額外的方法(監控、後臺處理或者獲取狀態watch, background or get stat) 並在最後呼叫forPath()指定要操作的ZNode
setData() 開始設定ZNode節點資料的操作. 可以呼叫額外的方法(版本或者後臺處理) 並在最後呼叫forPath()指定要操作的ZNode
getChildren() 開始獲得ZNode的子節點列表。 以呼叫額外的方法(監控、後臺處理或者獲取狀態watch, background or get stat) 並在最後呼叫forPath()指定要操作的ZNode
inTransaction() 開始是原子ZooKeeper事務. 可以複合create, setData, check, and/or delete 等操作然後呼叫commit()作為一個原子操作提交

後臺操作的通知和監控可以通過ClientListener介面釋出. 你可以在CuratorFramework例項上通過addListener()註冊listener, Listener實現了下面的方法:

  • eventReceived() 一個後臺操作完成或者一個監控被觸發

事件型別以及事件的方法如下:

Event Type Event Methods
CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GETDATA getResultCode(), getPath(), getStat() and getData()
SETDATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(), getChildren()
WATCHED getWatchedEvent()

還可以通過ConnectionStateListener介面監控連線的狀態。 強烈推薦你增加這個監控器。

你可以使用名稱空間Namespace避免多個應用的節點的名稱衝突。 CuratorFramework提供了名稱空間的概念,這樣CuratorFramework會為它的API呼叫的path加上名稱空間:

CuratorFramework    client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
 ...
client.create().forPath("/test", data);
// node was actually written to: "/MyApp/test"

建立builder時不是呼叫build()而是呼叫buildTemp()。 3分鐘不活動連線就被關閉,你也可以指定不活動的時間。 它只提供了下面幾個方法:

    public void     close();
    public CuratorTransaction inTransaction() throws Exception;
    public TempGetDataBuilder getData() throws Exception;

操作方法

上面的表格列出了CuratorFramework可以用的操作。 下面就是一個例子:

package com.colobu.zkrecipe.framework;

import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;

public class CrudExample {

    public static void main(String[] args) {

    }

    public static void create(CuratorFramework client, String path, byte[] payload) throws Exception {
        // this will create the given ZNode with the given data
        client.create().forPath(path, payload);
    }

    public static void createEphemeral(CuratorFramework client, String path, byte[] payload) throws Exception {
        // this will create the given EPHEMERAL ZNode with the given data
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
    }

    public static String createEphemeralSequential(CuratorFramework client, String path, byte[] payload) throws Exception {
        // this will create the given EPHEMERAL-SEQUENTIAL ZNode with the given
        // data using Curator protection.
        return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
    }

    public static void setData(CuratorFramework client, String path, byte[] payload) throws Exception {
        // set data for the given node
        client.setData().forPath(path, payload);
    }

    public static void setDataAsync(CuratorFramework client, String path, byte[] payload) throws Exception {
        // this is one method of getting event/async notifications
        CuratorListener listener = new CuratorListener() {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                // examine event for details
            }
        };
        client.getCuratorListenable().addListener(listener);
        // set data for the given node asynchronously. The completion
        // notification
        // is done via the CuratorListener.
        client.setData().inBackground().forPath(path, payload);
    }

    public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
        // this is another method of getting notification of an async completion
        client.setData().inBackground(callback).forPath(path, payload);
    }

    public static void delete(CuratorFramework client, String path) throws Exception {
        // delete the given node
        client.delete().forPath(path);
    }

    public static void guaranteedDelete(CuratorFramework client, String path) throws Exception {
        // delete the given node and guarantee that it completes
        client.delete().guaranteed().forPath(path);
    }

    public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception {
        /**
         * Get children and set a watcher on the node. The watcher notification
         * will come through the CuratorListener (see setDataAsync() above).
         */
        return client.getChildren().watched().forPath(path);
    }

    public static List<String> watchedGetChildren(CuratorFramework client, String path, Watcher watcher) throws Exception {
        /**
         * Get children and set the given watcher on the node.
         */
        return client.getChildren().usingWatcher(watcher).forPath(path);
    }
}

事務

上面也提到, CuratorFramework提供了事務的概念,可以將一組操作放在一個原子事務中。 什麼叫事務? 事務是原子的, 一組操作要麼都成功,要麼都失敗。

下面的例子演示了事務的操作:

package com.colobu.zkrecipe.framework;

import java.util.Collection;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;

public class TransactionExample {

    public static void main(String[] args) {

    }

    public static Collection<CuratorTransactionResult> transaction(CuratorFramework client) throws Exception {
        // this example shows how to use ZooKeeper's new transactions
        Collection<CuratorTransactionResult> results = client.inTransaction().create().forPath("/a/path", "some data".getBytes())
                .and().setData().forPath("/another/path", "other data".getBytes())
                .and().delete().forPath("/yet/another/path")
                .and().commit(); // IMPORTANT!
                                                                                                                                // called
        for (CuratorTransactionResult result : results) {
            System.out.println(result.getForPath() + " - " + result.getType());
        }
        return results;
    }

    /*
     * These next four methods show how to use Curator's transaction APIs in a
     * more traditional - one-at-a-time - manner
     */
    public static CuratorTransaction startTransaction(CuratorFramework client) {
        // start the transaction builder
        return client.inTransaction();
    }

    public static CuratorTransactionFinal addCreateToTransaction(CuratorTransaction transaction) throws Exception {
        // add a create operation
        return transaction.create().forPath("/a/path", "some data".getBytes()).and();
    }

    public static CuratorTransactionFinal addDeleteToTransaction(CuratorTransaction transaction) throws Exception {
        // add a delete operation
        return transaction.delete().forPath("/another/path").and();
    }

    public static void commitTransaction(CuratorTransactionFinal transaction) throws Exception {
        // commit the transaction
        transaction.commit();
    }
}