簡介
Curator是Netflix公司開源的一套Zookeeper客戶端框架。了解過Zookeeper原生API都會清楚其復雜度。Curator幫助我們在其基礎上進行封裝、實現一些開發細節,包括接連重連、反復註冊Watcher和NodeExistsException等。目前已經作為Apache的頂級項目出現,是最流行的Zookeeper客戶端之一。從編碼風格上來講,它提供了基於Fluent的編程風格支持。
除此之外,Curator還提供了Zookeeper的各種應用場景:Recipe、共享鎖服務、Master選舉機制和分布式計數器等。
項目及依賴
關於項目的介紹信息可以參考Apache官網提供的關於Curator的資料信息。項目在GitHub上的開源地址隨著從Netflix轉移到Apache也發生了變化。原地址為:https://github.com/Netflix/curator,新地址為:https://github.com/apache/curator。
版本
目前Curator有2.x.x和3.x.x兩個系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,並且提供了一些諸如動態重新配置、watch刪除等新特性。
項目組件
名稱 | 描述 |
---|---|
Recipes | Zookeeper典型應用場景的實現,這些實現是基於Curator Framework。 |
Framework | Zookeeper API的高層封裝,大大簡化Zookeeper客戶端編程,添加了例如Zookeeper連接管理、重試機制等。 |
Utilities | 為Zookeeper提供的各種實用程序。 |
Client | Zookeeper client的封裝,用於取代原生的Zookeeper客戶端(ZooKeeper類),提供一些非常有用的客戶端特性。 |
Errors | Curator如何處理錯誤,連接問題,可恢復的例外等。 |
Maven依賴
Curator的jar包已經發布到Maven中心,由以下幾個artifact的組成。根據需要選擇引入具體的artifact。但大多數情況下只用引入curator-recipes即可。
GroupID/Org | ArtifactID/Name | 描述 |
---|---|---|
org.apache.curator | curator-recipes | 所有典型應用場景。需要依賴client和framework,需設置自動獲取依賴。 |
org.apache.curator | curator-framework | 同組件中framework介紹。 |
org.apache.curator | curator-client | 同組件中client介紹。 |
org.apache.curator | curator-test | 包含TestingServer、TestingCluster和一些測試工具。 |
org.apache.curator | curator-examples | 各種使用Curator特性的案例。 |
org.apache.curator | curator-x-discovery | 在framework上構建的服務發現實現。 |
org.apache.curator | curator-x-discoveryserver | 可以喝Curator Discovery一起使用的RESTful服務器。 |
org.apache.curator | curator-x-rpc | Curator framework和recipes非Java環境的橋接。 |
根據上面的描述,開發人員大多數情況下使用的都是curator-recipes的依賴,此依賴的maven配置如下:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
由於版本兼容原因,采用了2.x.x的最高版本。
案例及功能說明
創建會話
Curator的創建會話方式與原生的API和ZkClient的創建方式區別很大。Curator創建客戶端是通過CuratorFrameworkFactory工廠類來實現的。其中,此工廠類提供了三種創建客戶端的方法。
前兩種方法是通過newClient來實現,僅參數不同而已。
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
使用上面方法創建出一個CuratorFramework之後,需要再調用其start()方法完成會話創建。
實例代碼:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy); client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000,1000,retryPolicy); client.start();
其中參數RetryPolicy提供重試策略的接口,可以讓用戶實現自定義的重試策略。默認提供了以下實現,分別為ExponentialBackoffRetry、BoundedExponentialBackoffRetry、RetryForever、RetryNTimes、RetryOneTime、RetryUntilElapsed。
進一步查看源代碼可以得知,其實這兩種方法內部實現一樣,只是對外包裝成不同的方法。它們的底層都是通過第三個方法builder來實現的。
實例代碼:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client =CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(retryPolicy) .sessionTimeoutMs(6000) .connectionTimeoutMs(3000) .build(); client.start();
觀察上面的實例,我們可以看到此處已經使用了Fluent風格的編碼。其中namespace(“demo”)這項設置用來定義此會話的獨立命名空間,隨後的相應操作都是在此命名空間下進行操作。
重試策略
上面的例子中使用到了ExponentialBackoffRetry重試策略實現。此策略先給定一個初始化sleep時間baseSleepTimeMs,在此基礎上結合重試次數,通過以下代碼計算當前需要的sleep時間:
long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))); if ( sleepMs > maxSleepMs ){ sleepMs = maxSleepMs; }
隨著重試次數的增加,計算出的sleep時間也會越來越大。如果超過maxSleepMs則使用maxSleepMs的時間。其中maxRetries限制了最大的嘗試次數。
創建節點
Curator創建節點的方法也是基於Fluent風格編碼,原生API中的參數很多都轉化為一層層的方法調用來進行設置。下面簡單介紹一下常用的幾個節點創建場景。
(1)創建一個初始內容為空的節點
client.create().forPath(path);
Curator默認創建的是持久節點,內容為空。
(2)創建一個包含內容的節點
client.create().forPath(path,"我是內容".getBytes());
Curator和ZkClient不同的是依舊采用Zookeeper原生API的風格,內容使用byte[]作為方法參數。
(3)創建臨時節點,並遞歸創建父節點
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
此處Curator和ZkClient一樣封裝了遞歸創建父節點的方法。在遞歸創建父節點時,父節點為持久節點。
刪除節點
刪除節點的方法也是基於Fluent方式來進行操作,不同類型的操作調用新增不同的方法調用即可。
(1)刪除一個子節點
client.delete().forPath(path);
(2)刪除節點並遞歸刪除其子節點
client.delete().deletingChildrenIfNeeded().forPath(path);
(3)指定版本進行刪除
client.delete().withVersion(1).forPath(path);
如果此版本已經不存在,則刪除異常,異常信息如下。
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for
(4)強制保證刪除一個節點
client.delete().guaranteed().forPath(path);
只要客戶端會話有效,那麽Curator會在後臺持續進行刪除操作,直到節點刪除成功。比如遇到一些網絡異常的情況,此guaranteed的強制刪除就會很有效果。
讀取數據
讀取節點數據內容API相當簡單,Curator提供了傳入一個Stat,使用節點當前的Stat替換到傳入的Stat的方法,查詢方法執行完成之後,Stat引用已經執行當前最新的節點Stat。
// 普通查詢 client.getData().forPath(path); // 包含狀態查詢 Stat stat = new Stat(); client.getData().storingStatIn(stat()).forPath(path);
更新數據
更新數據,如果未傳入version參數,那麽更新當前最新版本,如果傳入version則更新指定version,如果version已經變更,則拋出異常。
// 普通更新 client.setData().forPath(path,"新內容".getBytes()); // 指定版本更新 client.setData().withVersion(1).forPath(path);
版本不一直異常信息:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for
異步接口
在使用以上針對節點的操作API時,我們會發現每個接口都有一個inBackground()方法可供調用。此接口就是Curator提供的異步調用入口。對應的異步處理接口為BackgroundCallback。此接口指提供了一個processResult的方法,用來處理回調結果。其中processResult的參數event中的getType()包含了各種事件類型,getResultCode()包含了各種響應碼。
重點說一下inBackground的以下接口:
public T inBackground(BackgroundCallback callback, Executor executor);
此接口就允許傳入一個Executor實例,用一個專門線程池來處理返回結果之後的業務邏輯。
Tags: Netflix 計數器 watch 客戶端 程序
文章來源: