1. 程式人生 > >Zookeeper(七)開源客戶端

Zookeeper(七)開源客戶端

經過上面兩節部落格的介紹,朋友們應該會開始簡單地使用ZooKeeper了。

在這一偏文章中,我們將圍繞ZkClient和Curator這兩個開源的ZooKeeper客戶端產品,再來進一步看看如何更好地使用ZooKeeper。

ZkClient

ZkClient是Github上一個開源的Zookeeper客戶端,是由Datameer的工程師StefanGroschupf和Peter Voss一起開發的。ZkClient在ZooKeeper原生API介面之上進行了包裝,是一個更易用的Zookeeper客戶端。同時,ZkClient在內部實現了諸如Session超時重連/Watcher反覆註冊等功能,是的ZooKeeper客戶端的這些繁瑣的細節工作對開發人員透明。 在本節中,我們將從建立會話,建立節點,讀取資料,更新資料,刪除節點和檢測節點是否存在等方面來介紹如何使用ZkClient這個ZooKeeper客戶端。當然,由於底層實現還是對ZooKeeper原生API的包裝,因此本節不會進行太多原理性的描述。 我們先看下ZkClient的Maven依賴:
<dependencies>
       <dependency>
              <groupId>org.apache.zookeeper</groupId>
               <artifactId>zookeeper<artifactId>
               <version>${zookeeper.version}</version>
       </dependency>
       <dependency>
               <groupId>com.github.sgroschupf</groupId>
               <artifactId>zkclient</artifactId>
               <version>${zkclient.version}</version>
       </dependency>
</dependencies>

建立會話

上面兩節中,我們已經介紹瞭如何通過例項化一個ZooKeeper物件來完成會話的建立。在本節中,我們將介紹如何使用ZkClient來完成同樣的操作。在ZkClient中,有如下7種構造方法:
public ZkClient(String serverstring)
public ZkClient(String zkServers,int connectionTimeout)
public ZkClient(String zkServers,int sessionTimeout,int connectionTimeout)
public ZkClient(String zkServers,int sessionTimeout,int connectionTimeout,ZkSerializer zkSerializer)
public ZkClient(IZkConnection connection)
public ZkClient(IZkConnection connection,int connectionTimeout)
public ZkClient(IZkConnection zkConnection,int connectinTimeout,ZkSerializer zkSerializer)

ZkClient構造方法引數說明,表5-11
引數名 說明
zkServers 指ZooKeeper伺服器列表,由英文狀態逗號分開的host:port字串組成,每一個代表一臺ZooKeeper機器,例如192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
sessionTimeout 會話超時時間,單位為毫秒。預設是30000ms
connectionTimeout 連結建立超時時間,單位為毫秒。此引數表明如果在這個時間段內還是無法和ZooKeeper建立連線,那麼就放棄連線,直接丟擲異常
connection IZkConnection介面的實現類
zkSerializer 自定義序列化器

注意,在ZkClient的構造方法中,很多引數和ZooKeeper原生的構造方法中的引數一致,所以表5-11只是坐簡要介紹,具體可以參見表5-12中的相關介紹。

在講解使用ZooKeeper原生APi建立會話的時候,我們提到:ZooKeeper會話的建立是一個非同步的過程。對於ZooKeeper客戶端的這個特點,開發人員需要自己來進行等待處理。而ZkClient通過內部包裝,將這個非同步的會話建立同步化了,這對於開發者的使用來說非常方便。

接下來看看IZkConnection介面。org.IOItec.zkclient,IZkConnection介面是對ZooKeeper原生介面最直接的包裝,也是和ZooKeeper最直接的互動層,裡面包含了添,刪,改,查等一些列介面的定義。ZkClient預設提供對IZkConnection介面的兩種實現,ZkConnection和InMemoryConnection,前者是我們最常用的實現方式。通常開發人員不需要對IZkConnection進行改造,直接使用ZkConnection這個實現就可以完成絕大部分的業務貢獻。

最後我們來看看ZkClient和ZooKeeper原生構造方法的最大區別,那就是在ZkClient的構造方法中,不再提供傳入Watcher物件的引數了。那麼,客戶端如何去監聽服務端的相關事件呢?別擔心,ZkClient引入了大多數java程式設計師都使用過的Listener來實現Watcher註冊。值得一提的是,ZkClient從API級別來支援Watcher監聽的註冊,這樣的用法更貼近Java工程師的習慣。關於事件監聽的註冊方法,在後面會做詳細講解。

清單5-17,使用ZkClient建立會話

public class Create_Session_Sample{
      public static void main(String[] args) throws IOException,InterruptedException{
           ZkClient zkClient = new ZkClient("domain1.book.zookeeper:2181",5000);
           System.out.println("ZooKeeper session established.");  
      }
}

執行程式,輸出結果如下:

ZooKeeper session established

上面這個示例展示瞭如何使用ZkClient來建立會話。

建立節點


ZkClient中提供了以下一系列介面來建立節點,開發者可以通過這些介面來進行各種型別的節點建立:


清單5-18 使用ZkClient建立節點

packege book.chapter
import org.IOItec.zkclient.ZkClient;

public class Create_Node_Sample{
      public static void main(String[] args) throws Excpetion{
           ZkClient zkClient = new ZkClient("domain1.book.zookeeper:2181",5000);
           String path = "/zk-book/c1";
           zkClient.createPersistent(path,true); 
      }
}

在上面這個示例程式中,我們使用ZkClient的createPersisten介面建立節點,並且設定createParents引數為true,表明需要遞迴建立父節點。很顯然,使用ZkClient省去了很多繁瑣的工作。

另外,ZkClient的API中還提供了支援非同步建立節點的方法,鑑於非同步方式的使用和上文中講解的非常類似,這裡不再贅述。

關於讀取,修改節點,操作都是類似,下面我們主要來看看ZooKeeper的另一種開源開戶端:Curator。

Curator


Curator是Netflix公司開源的一套ZooKeeper客戶端框架,作者是Jordan Zimmerman。和ZiClient一樣,Curator解決了很多ZooKeeper客戶端非常底層的細節開發工作,包括連線重連,反覆註冊Watcher和NodeExistsExceptino異常等,目前已經成為了Apathe的頂級專案,是全世界使用最廣泛的ZooKeeper客戶端之一,Patrick Hunt(ZooKeeper程式碼的核心提交者)以一句“Guava is頭Java what Curator is to ZooKeeper“(Curator對於ZooKeeper,可以說就像Guava工具集對於java平臺一樣,作用巨大)對其進行了高度評價。

除了封裝一些開發人員不需要特別關注的底層細節之外,Curator還在ZooKeeper原生API的基礎上進行了包裝,提供了一套易用性和可讀性更強的Fluent縫合的客戶端API框架。

除此之外,Curator中還提供了ZooKeeper各種應用場景(Recipe,如共享鎖服務/Master選舉機制和分散式計數器等)的抽象封裝。

在講解API之前,首先來看看Curator的Maven依賴:

<dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.4.2</version>
</dependency>

建立會話



使用Curator客戶端建立會話的過程馭ZooKeeper的原生API和ZkClient兩種建立會話的方式有很大的不同。具體如下。


1.使用CuratorFrameworkFactory這個工廠類的兩個靜態方法來建立一個客戶端:


static CuratorFramework newClient(String connectionString,RetryPolicy retyrPolicy);

static CuratorFramework newClient(String conectString,int sessionTimeoutMs,int connectionTimeoutMs,RetryPolicy retryPolicy);

2.通過呼叫CuratorFramework中的start()方法來啟動會話。

表5-20對構造方法中的各引數進行了說明。

表5-20

引數名 說明
connectString 指ZooKeeper伺服器列表,由英文狀態逗號分開的host:port字串組成,每一個都代表一臺ZooKeeper機器,例如,192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
retryPolicy 重試策略,預設主要有四種實現,分別是Exponential BackoffRetry,RetryNTimes,RetyrOneTime,RetryUnitElapsed
sessionTimeoutMs 會話超時時間,單位為毫秒。預設是60000ms
connectionTimeoutMs 連線建立超時時間,單位為毫秒。預設是15000ms

使用Curator建立會話


清單5-21

package book.chapter
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apche.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

//使用Curator來建立一個ZooKeeper客戶端
public class Create_Session_Sample{
      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
      CuratorFramework client = CuratorFrameworkFactory.newClient("domain1.book.zookeeper:2181",5000,3000,retryPolicy);
      client.start();
      Thread.sleep(Integer.MAX_VALUE);
}

在上面這個示例程式中,我們首先建立了一個名為ExponentialBackoffRetry的重試策略,該重試策略是Curator預設提供的幾種重試策略之一,其構造方法如下:

ExponentialBackoffRetry(int baseSleepTimeMs,int maxRetries);

ExponetialBackoffRetry(int baseSleepTimeMs,int maxRetries,int maxSleepMs);

ExponetialBackoffRetry構造方法引數說明如表5-22所示

表5-22

引數名 說明
baseSleepTimeMs 初始sleep時間
maxRetries 最大重試次數
maxSleepMs 最大sleep時間
使用Fluent風格的API介面來建立會話

Curator提供的API介面在設計上最大的亮點在於其遵循了Fluent設計風格,這也是和ZooKeeper原生API以及ZkClient客戶端有很大不同的地方。清單5-22展示瞭如何使用Fluent風格的API介面來建立會話。

清單5-22

package book.chapter;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
//使用Fluent風格的API介面來建立一個ZooKeeper客戶端
public class Create_Session_Sample_fluent{
      public static void main(String[] args) throws Exception{
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
            CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(retryPolicy).build();
             client.start();
             Thread.sleep(Integer.MAX_VALUE);
      }
}

使用Curator建立含隔離名稱空間的會話

為了實現不同的ZooKeeper也無語之間的隔離,往往會為每個業務分配一個獨立的名稱空間,即指定一個ZooKeeper根路徑。例如,下面所示的程式碼片段中定義了某一個客戶端的獨立名稱空間/base,那麼該客戶端對ZooKeeper上資料節點的任何操作,都是基於該相對目錄進行的:

CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();

我們後續會介紹更多關於客戶端隔離名稱空間的內容。

建立節點

Curator中提供了一系列Fluent風格的介面,開發人員可以通過對其進行自由組合來完成各種型別節點的建立。

清單5-23

CuratorFramework
  --public CreateBuilder create();
CreateBuilder
  --public
ProtectACLCreateModePathAndBytesable<String>creatingParentsIfNeeded();
CreateModable
  --public T withMode(CreateMode mode);
PathAndBytesable<T>
  --public T forPath(String path,byte[] data) throws Exception;
  --public TforPath(String path) throws Exception;

以上就是一系列最常用的建立節點API,下面通過一些場景來說明如何使用這些API。

建立一個節點,初始內容為空

client.create().forPath(path);

注意,如果沒有設定節點屬性,那麼Curator預設建立的是持久節點,內容預設是空。這裡的client是指上文中提到的一個已經完成會話建立並啟動的Curator客戶端例項,即CuratorFramework物件例項。

建立一個節點,附帶初始內容

client.create().forPath(path,"init".getBytes());

也可以在建立節點的時候寫入初始節點內容。和ZkClient不同的是,Curator任然是按照ZooKeeper原生API風格,使用byte[]作為方法引數。

建立一個臨時節點,並自動遞迴建立父節點

client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPaht(path);

這個藉口非常有用,在使用ZooKeeper的過程中,開發人員經常會碰到NoNodeException異常,其中一個可能的原因就是試圖對一個不存在的父節點建立子節點。因此,開發人員不得不在每次建立節點之前,都判斷一下該父節點是否存在——這個處理讓人厭倦。在使用Curator之後,通過呼叫creatingParentsIfNeeded介面,Curator就能夠自動地遞迴建立所有需要的父節點。

同時需要注意的一點是,由於在ZooKeeper中規定了所有非葉子節點必須為持久節點,呼叫上面這個API之後,只有paht引數對應的資料節點時臨時節點,其父節點均為持久節點。

下面通過一個實際的例子來看看如何在程式碼中使用這些API

清單5-24

package book.chapter
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

//使用Curator建立節點
public class Create_Node_Sample{
      static String path = "/zk-book/c1";
      static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000,3)).build();

      public static void main(String[] args) throws Exception{
           client.start();
           client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,"init".getBytes());
       }
}

刪除節點

在Curator中,可以通過以下API來刪除指定節點:

清單5-25

CuratorFramework
  --pulbic DeleteBuilder delete();
Versionable<T>
   --public T withVersion(int version);
DeleteBuilder
  --public DeleteBuilderBase guaranteed();
PathAndBytesable<T>
 --public T forPath(String path,byte[] data) throws Exception;
  --public T forPath(String path) throws Exception;

以上就是一系列最常用的刪除節點API,下面通過一些場景來說明如何使用這些API。

刪除一個節點

client.delete().forPath(path);

注意,使用該介面,只能刪除葉子節點。

刪除一個節點,並遞迴刪除其所有子節點

client.delete().deletingChildrenIfNeeded().forPath(path);

刪除一個節點,強制指定版本進行刪除

client.delete().withVersion(version).forPath(path);

刪除一個節點,強制保證刪除

client.delete().guaranteed().forPath(path);

注意,guaranteed()介面是一個保障措施,只要客戶端會話有效,那麼Curator會在後臺持續進行刪除操作,知道節點刪除成功。

下面通過一個實際例子來看看如何在程式碼中使用這些API。

清單5-26 Curator刪除節點API示例

package book.chapter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

//使用Curator刪除節點
public class Del_Data_Sample{
      static String path = "/zk-book/c1";
      static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:22181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000,3)).build();
    public static void main(String[] args) throws Exception{
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPaht(path,"init".getBytes());
         Stat stat = new Stat();
         client.getData().storingStatIn(stat).forPath(path);
          client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
}
}

上面這個程式就是一個簡單的節點刪除例項。這裡重點講解guaranteed()這個方法。正如該介面的官方文件中所註明的,在ZooKeeper客戶端使用過程中,可能會碰到這樣的問題:客戶端執行一個刪除節點操作,但是由於一些網路原因,導致刪除操作失敗。對於這個異常,在有些場景中是致命的,如“Master選舉”——在這個場景中,ZooKeeper客戶端通常是通過節點的創建於刪除來實現的。針對這個問題,Curator中引入了一種重試機制:如果我們呼叫了guaranteed()方法,那麼當客戶端碰到上面這些網路異常的時候,會記錄下這次失敗的刪除操作,只要客戶端會話有效,那麼其就會在後臺反覆重試,知道節點刪除成功。通過這樣的措施,就可以保證節點刪除操作一定會生效。

讀取資料

下面來看看如何通過Curator介面來獲取節點的資料內容。

清單5-27

CuratorFramework

  --public GetDataBuilder getData();

Statable<T>

  --public T storingStatIn(Stat stat);

Pathable<T>

  --public T forPath(String path) throws Exception;

以上就是一系列最常用的讀取資料節點內容的API介面,下面通過一些場景來說明如何使用這些API。

讀取一個節點的資料內容

client.getData().forPath(path);

注意,該介面呼叫後的返回值是byte[]。

讀取一個節點的資料內容,同時獲取到該節點的stat

client.getData().storingStatIn(stat).forPath(path);

Curator通過傳入一箇舊的stat變數的方式來儲存服務端返回的最新的節點狀態資訊。

下面通過一個實際例子來看看如何在程式碼中使用這些API。

清單5-28 Curator讀取資料API、例項

paclage bppl/cja[ter;
import org.apache.curator.framework.CuratorFramework;
import org.apche.curator.framewor.CuratorFrameworkFactory;
import org.apche.curator.retry.ExponentialBackofRetyr;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

//使用Curatro獲取資料內容
public class Get_Data_Sample{
     static String path = "/zk-book";
     static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000,3)).build();
      public static main(String[] args) throws Exception{
           client.start();
           client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,"init".getBytes());
            Stat stat = new Stat();
            System.out.println(new String(client.getDate().storingStatIn().forPath(path)));
       }
}

更新資料

在Curator中,可以通過以下API來更新指定節點的資料。

清單5-29 Curator更新資料API

CuratorFramework

    --public SetDateBuilder setData();

Versionable<T>

   --public T withVersion(int version);

pathAndBytesable<T>

   --public T forpath(String path,byte[] data) throws Exception;

    pubic T forPath(String path) throws Exception;

以上就是一系列最常用的更新資料API,下面通過一些具體場景來說明如何使用這些API。

更新一個節點的資料內容

client.setData().forPath(path);

呼叫該介面後,會返回一個stat物件。

更新一個節點的資料內容,強制指定版本進行更新

client.setData().withVersion(version).forPath(path);

注意,withVersion介面就是用來實現CAS(Compare and Swap)的,version(版本資訊)通常從一箇舊的stat物件中獲取到的。

下面通過一個實際例子來看看如何在程式碼中使用這些API。

清單5-30 Curator更新資料API例項

package book.chapter;
import org.apache.curator.framework.CuratorFramework;
improt org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curatro.retyr.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

//使用Curator更新資料內容
public class Set_Data_Sample{
     static String path = "/zk-book";
     static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("domain1.book.zookeeper:2181").sessionTimeoutMs(5000).retryPolicy(new ExponnentialBackoffRetry(1000,3)).build();
     public static void main(String[] args) throws Exception{
           client.start();
           client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,"init".getBytes());
           Stat stat = new Stat();
           client.getData().storingStatIn(stat).forPath(path);
           System.out.println("Success set node for:"+path+",new version:"+client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
            try{
                client.setData().withVersion(stat.getVersion()).forPath(path);
            }catch(Exception e){
                   System.out.printlin("Fail set node due to"+ e.getMessage());
            }
     }
}

上面的示例程式演示瞭如何使用Curator的API來進行ZooKeeper資料節點的內容更新。該程式前後進行了兩次更新操作,第一次使用最新的stat變數進行更新操作。更新成功;第二次使用了過期的stat變數進行更新操作,丟擲異常:KeeperErrorCode = BadVersion。

關於Curator的普通用法我們先說到這裡,下面一節我們一起來研究Curator的非同步介面,master選舉,分散式鎖以及分散式計數器。下面是更高階而且非常常用的內容。尤其是zookee的分散式鎖,其應用程度比redis的分散式鎖永盈更為廣泛。