1. 程式人生 > >ZooKeeper客戶端curator元件介紹

ZooKeeper客戶端curator元件介紹

Curator framework提供了高階API, 極大的簡化了ZooKeeper的使用。 它在ZooKeeper基礎上增加了很多特性,可以管理與ZOoKeeper的連線和重試機制。這些特性包括:

  • 自動連線管理
    ** 有些潛在的錯誤情況需要讓ZooKeeper client重建連線和重試。Curator可以自動地和透明地處理這些情況
  • Cleaner API
    簡化原始的ZooKeeper方法,事件等 提供現代的流式介面
  • 技巧(Recipe)實現
    Leader選舉 共享鎖
    Path快取和監控 分散式佇列
    分散式優先順序佇列 

    產生Curator framework 例項

    使用CuratorFrameworkFactory產生framework例項。 CuratorFrameworkFactory 既提供了factory方法也提供了builder來建立例項。 CuratorFrameworkFactory是執行緒安全的
    。你應該在應用中為單一的ZooKeeper叢集共享唯一的CuratorFramework例項。

工廠方法(newClient())提供了一個簡單的方式建立例項。Builder可以使用更多的引數控制生成的例項。一旦生成framework例項, 必須呼叫start方法啟動它。應用結束時應該呼叫close方法關閉它。

CuratorFramework API

CuratorFramework 使用流程風格的介面。 程式碼勝於說教:

1234 client.create().forPath("/head", new byte[0]);client.delete().inBackground().forPath("/head"
);client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);client.getData().watched().inBackground().forPath("/test");

方法

方法名 描述
create() 開始一create操作. 可以呼叫額外的方法(mode or background),最後呼叫forPath()
delete() 開始一個delete操作. 呼叫額外的方法(version or background) , 最好呼叫forPath()
checkExists() 開始一個檢查ZNode是否存在的操作. 呼叫額外的方法 (watch or background), 最後呼叫forPath()
getData() 開始一個獲取ZNode節點資料的操作. 呼叫額外的方法(watch, background or get stat), 最後呼叫forPath()
setData() 開始一個設定ZNode節點資料的操作. 呼叫額外的方法(version or background), 最後呼叫forPath()
getChildren() 開始一個獲取ZNode的子節點列表的操作.呼叫額外的方法(watch, background or get stat), 最後呼叫forPath()
inTransaction() 開始一個原子的ZooKeeper事務. 可以包含 create, setData, check, and/or delete 操作的組合, 然後commit() 作為一個原子操作.

通知 Notifications

服務於後臺操作和監控(watch)的通知通過ClientListener介面釋出。你通過CuratorFramework例項的addListener方法可以註冊監聽器。
Interface CuratorListener

1 eventReceived() A background operation has completed or a watch has triggered. Examine the given event for details

Interface ConnectionStateListener:

12 stateChanged(CuratorFramework client, ConnectionState newState) Called when there is a state change in the connection

Interface UnhandledErrorListener:

12 unhandledError(String message, Throwable e) Called when an exception is caught in a background thread, handler, etc.

ClientEvent

ClientEvent是事件父類, 代表後臺通知和監控的型別。 ClientEvent有用的欄位根據事件的型別(getType()方法獲取)不同而不同。

Event Type Event Methods
CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GETDATA getResultCode(), getPath(), getStat() and getData()
SETDATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(), getChildren()
WATCHED getWatchedEvent()

名稱空間 namespace

因為ZOoKeeper是一個共享的叢集。所以名稱空間約定極為重要,各個應用在使用同一叢集時不會有衝突的ZK path。
CuratorFramework 提供了名稱空間的概念。當生成CuratorFramework 可以設定名稱空間。CuratorFramework在呼叫API會在所有的path前面加上名稱空間。

1234 CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build(); ...client.create().forPath("/test", data);// node was actually written to: "/MyApp/test"

臨時連線

Temporary CuratorFramework instances are meant for single requests to ZooKeeper ensembles over a failure prone network such as a WAN.
臨時的CuratorFramework用來發送單獨請求通過一個易出錯的網路如WAN。CuratorTempFramework 的API是有限制的,而且,連線在不用一段時間後會被關閉。
這個想法基於Camille Fournier的文章: http://whilefalse.blogspot.com/2012/12/building-global-highly-available.html

建立CuratorTempFramework

就像正常的CuratorFramework 例項一樣,CuratorTempFramework依然通過CuratorFrameworkFactory 建立。但是最後不是呼叫build()方法, 而是buildTemp()。buildTemp()建立建立CuratorTempFramework然後在它不用三分鐘後就關閉它。有個buildTemp()過載版本可以設定不活躍(不用)的時間。

Limited API

CuratorTempFramework提供了下列方法:

1234567891011121314151617181920 /** * Stop the client */ public void close(); /** * Start a transaction builder * * @return builder object * @throws Exception errors */ public CuratorTransaction inTransaction() throws Exception; /** * Start a get data builder * * @return builder object * @throws Exception errors */ public TempGetDataBuilder getData() throws Exception;

Apache Curator Utilities

Curator提供了一組工具類和方法用來測試基於Curator的應用。 並且提供了操作ZNode輔助類以及其它一些資料結構

Test Server

curator-test提供了TestingServer類。 這個類建立了一個本地的, 同進程的ZooKeeper伺服器用來測試。

Test Cluster

curator-test提供了TestingCluster類。 這個類建立了一個內部的ZooKeeper叢集用來測試。

ZKPaths

提供了各種靜態方法來操作ZNode:

  • getNodeFromPath: 從一個全路徑中得到節點名, 比如 “/one/two/three” 返回 “three”
  • mkdirs: 確保所有的節點都已被建立
  • getSortedChildren: 得到一個給定路徑的子節點, 按照sequence number排序
  • makePath: 給定父路徑和子節點,建立一個全路徑

EnsurePath

確保一個特定的路徑被建立。當它第一次使用時,一個同步ZKPaths.mkdirs(ZooKeeper, String)呼叫被觸發來確保完整的路徑都已經被建立。後續的呼叫將不是同步操作.
用法:

12345678 EnsurePath ensurePath = new EnsurePath(aFullPathToEnsure);...String nodePath = aFullPathToEnsure + "/foo";ensurePath.ensure(zk); // first time syncs and creates if neededzk.create(nodePath, ...);...ensurePath.ensure(zk); // subsequent times are NOPszk.create(nodePath, ...);

注意: 此方法namespace會參與路徑名字的建立。

BlockingQueueConsumer

提供JDK BlockingQueue類似的行為。

QueueSharder

由於zookeeper傳輸層的限制, 單一的佇列如果超過10K的元素會被分割(break)。 這個類為多個分散式佇列提供了一個facade。 它監控佇列, 如果一個佇列超過這個閾值, 一個新的佇列就被建立。 在這些佇列中Put是分散式的。

Reaper and ChildReaper

Reaper
可以用來刪除鎖的父路徑。定時檢查路徑被加入到reaper中。 當檢查時,如果path沒有子節點/路徑, 此路徑將被刪除。每個應用中CLient應該只建立一個reaper例項。必須將lock path加到這個readper中。 reaper會定時的檢查刪除它們。

ChildReaper
用來清除父節點下所有的空節點。定時的呼叫getChildren()並將空節點加入到內部管理的reaper中。

  • 注意: 應該考慮使用LeaderSelector來執行Reapers, 因為它們不需要在每個client執行.

Apache Curator Client

Curator client使用底層的API, 強烈推薦你是用Curator Framework代替使用CuratorZookeeperClient

背景

CuratorZookeeperClient 是ZOoKeeper client的包裝類。但是提供了更簡單方式, 而且可以減少錯誤的發生。它提供了下列的特性:

  • 持續的連線管理 - ZooKeeper有很多的關於連線管理的警告(你可以到ZooKeeper FAQ檢視細節)。CuratorZookeeperClient 可以自動的管理這些事情。
  • retry - 提供一個方式處理retry。
  • Test ZooKeeper server - 提供一個程序內的ZooKeeper測試伺服器用來測試和實驗。

方法

Method Description
Constructor 建立一個給定ZooKeeper叢集的連線。 你可以傳入一個可選的watcher. 必須提供Retry策略
getZooKeeper() 返回管理的ZooKeeper例項. 重要提示: a) 它會花費些許時間等待連線來完成, 在使用其它方法之前你應該校驗連線是否完成. b) 管理的ZooKeeper例項可以根據特定的事件而改變。 不要持有例項太長時間. 總是呼叫getZooKeeper()得到一個新的例項.
isConnected() 返回ZooKeeper client當前連線狀態
blockUntilConnectedOrTimedOut() block知道連線成功或者超時
close() 關閉連線
setRetryPolicy() 改變retry策略
newRetryLoop() 分配一個新的Retry Loop - 詳情看下邊

Retry Loop

由於各種各樣的原因, 在zookeeper叢集上的操作難免遇到失敗的情況。最佳實踐表明應該提供重試機制。Retry Loop 為此而生。 每個操作都被包裝在一個Retry Loop中。下面是一個典型的處理流程:

12345678910111213141516 RetryLoop retryLoop = client.newRetryLoop();while ( retryLoop.shouldContinue() ){ try { // perform your work ... // it's important to re\-get the ZK instance as there may have been an error and the instance was re\-created ZooKeeper zk = client.getZookeeper(); retryLoop.markComplete(); } catch ( Exception e ) { retryLoop.takeException(e); }}

Retry Loop維護一定數量的retry, 它還決定一個錯誤是否可以要執行retry。 假如一個錯誤需要retry,Retry策略被呼叫來決定retry是要要執行,執行多少次才放棄。
很方便地,RetryLoop 提供了一個靜態方法使用Callable來執行一個完整retry loop。

123456789 RetryLoop.callWithRetry(client, new Callable<Void>(){ @Override public Void call() throws Exception { // do your work here - it will get retried if needed return null; }});

Retry策略

retry策略可以改變retry的行為。 它抽象出RetryPolicy介面, 包含一個方法public boolean allowRetry(int retryCount, long elapsedTimeMs);。 在retry被嘗試執行前, allowRetry()被呼叫,並且將當前的重試次數和操作已用時間作為引數. 如果返回true, retry被執行。否則異常被丟擲。
Curator本身提供了幾個策略(在 com.netflix.curator.retry 包下):

Policy Name Description
ExponentialBackoffRetry 重試一定次數,每次重試sleep更多的時間
RetryNTimes 重試N次
RetryOneTime 重試一次
RetryUntilElapsed 重試一定的時間