1. 程式人生 > >Zookeeper 快速入門(上)

Zookeeper 快速入門(上)

his 限制 change gic ise child 可能 他會 tom

來源:holynull,

blog.leanote.com/post/holynull/Zookeeper

如有好文章投稿,請點擊 → 這裏了解詳情

Zookeeper是Hadoop分布式調度服務,用來構建分布式應用系統。構建一個分布式應用是一個很復雜的事情,主要的原因是我們需要合理有效的處理分布式集群中的部分失敗的問題。例如,集群中的節點在相互通信時,A節點向B節點發送消息。A節點如果想知道消息是否發送成功,只能由B節點告訴A節點。那麽如果B節點關機或者由於其他的原因脫離集群網絡,問題就出現了。A節點不斷的向B發送消息,並且無法獲得B的響應。B也沒有辦法通知A節點已經離線或者關機。集群中其他的節點完全不知道B發生了什麽情況,還在不斷的向B發送消息。這時,你的整個集群就發生了部分失敗的故障。

Zookeeper不能讓部分失敗的問題徹底消失,但是它提供了一些工具能夠讓你的分布式應用安全合理的處理部分失敗的問題。

安裝和運行Zookeeper

我們采用standalone模式,安裝運行一個單獨的zookeeper服務。安裝前請確認您已經安裝了Java運行環境。

我們去Apache ZooKeeper releases page下載zookeeper安裝包,並解壓到本地:

% tar xzf zookeeper-x.y.z.tar.gz

ZooKeeper提供了一些可執行程序的工具,為了方便起見,我們將這些工具的路徑加入到PATH環境變量中:

% export ZOOKEEPER_HOME=~/sw/zookeeper-x.y.z

% export PATH=$PATH:$ZOOKEEPER_HOME/bin

運行ZooKeeper之前我們需要編寫配置文件。配置文件一般在安裝目錄下的conf/zoo.cfg。我們可以把這個文件放在/etc/zookeeper下,或者放到其他目錄下,並在環境變量設置ZOOCFGDIR指向這個個目錄。下面是配置文件的內容:

tickTime=2000

dataDir=/Users/tom/zookeeper

clientPort=2181

tickTime是zookeeper中的基本時間單元,單位是毫秒。datadir是zookeeper持久化數據存放的目錄。clientPort是zookeeper監聽客戶端連接的端口,默認是2181.

啟動命令:

% zkServer.sh start

我們通過nc或者telnet命令訪問2181端口,通過執行ruok(Are you OK?)命令來檢查zookeeper是否啟動成功:

% echo ruok | nc localhost 2181

imok

那麽我看見zookeeper回答我們“I’m OK”。下表中是所有的zookeeper的命名,都是由4個字符組成。

技術分享

技術分享

3.5.0以上的版本會有一個內嵌的web服務,通過訪問http://localhost:8080/commands來訪問以上的命令列表。

Zookeeper開發實例

這一節我們將講解如何編寫Zookeeper客戶端的程序,來控制zookeeper上的數據,以達到管理客戶端所在集群的成員關系。

ZooKeeper中的組和成員

我們可以把Zookeeper理解為一個高可用的文件系統。但是它沒有文件和文件夾的概念,只有一個叫做znode的節點概念。那麽znode即是數據的容器,也是其他節點的容器。(其實znode就可以理解為文件或者是文件夾)我們用父節點和子節點的關系來表示組和成員的關系。那麽一個節點代表一個組,組節點下的子節點代表組內的成員。如下圖所示:

技術分享

創建組

我們使用zookeeper的Java API來創建一個/zoo的組節點:

public class CreateGroup implements Watcher {

private static final int SESSION_TIMEOUT = 5000;

private ZooKeeper zk;

private CountDownLatch connectedSignal = new CountDownLatch(1);

public void connect(String hosts) throws IOException, InterruptedException {

zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);

connectedSignal.await();

}

@Override

public void process(WatchedEvent event) { // Watcher interface

if (event.getState() == KeeperState.SyncConnected) {

connectedSignal.countDown();

}

}

public void create(String groupName) throws KeeperException,

InterruptedException {

String path = "/" + groupName;

String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("Created " + createdPath);

}

public void close() throws InterruptedException {

zk.close();

}

public static void main(String[] args) throws Exception {

CreateGroup createGroup = new CreateGroup();

createGroup.connect(args[0]);

createGroup.create(args[1]);

createGroup.close();

}

}

當main()執行時,首先創建了一個CreateGroup的對象,然後調用connect()方法,通過zookeeper的API與zookeeper服務器連接。創建連接我們需要3個參數:一是服務器端主機名稱以及端口號,二是客戶端連接服務器session的超時時間,三是Watcher接口的一個實例。Watcher實例負責接收Zookeeper數據變化時產生的事件回調。

在連接函數中創建了zookeeper的實例,然後建立與服務器的連接。建立連接函數會立即返回,所以我們需要等待連接建立成功後再進行其他的操作。我們使用CountDownLatch來阻塞當前線程,直到zookeeper準備就緒。這時,我們就看到Watcher的作用了。我們實現了Watcher接口的一個方法:

public void process(WatchedEvent event);

當客戶端連接上了zookeeper服務器,Watcher將由process()函數接收一個連接成功的事件。我們接下來調用CountDownLatch,釋放之前的阻塞。

連接成功後,我們調用create()方法。我們在這個方法中調用zookeeper實例的create()方法來創建一個znode。參數包括:一是znode的path;二是znode的內容(一個二進制數組),三是一個access control list(ACL,訪問控制列表,這裏使用完全開放模式),最後是znode的性質。

znode的性質分為ephemeral和persistent兩種。ephemeral性質的znode在創建他的客戶端的會話結束,或者客戶端以其他原因斷開與服務器的連接時,會被自動刪除。而persistent性質的znode就不會被自動刪除,除非客戶端主動刪除,而且不一定是創建它的客戶端可以刪除它,其他客戶端也可以刪除它。這裏我們創建一個persistent的znode。

create()將返回znode的path。我們講新建znode的path打印出來。

我們執行如上程序:

% export CLASSPATH=ch21-zk/target/classes/:$ZOOKEEPER_HOME/*:\

$ZOOKEEPER_HOME/lib/*:$ZOOKEEPER_HOME/conf

% java CreateGroup localhost zoo

Created /zoo

加入組

接下來我們實現如何在一個組中註冊成員。我們將使用ephemeral znode來創建這些成員節點。那麽當客戶端程序退出時,這些成員將被刪除。

我們創建一個ConnetionWatcher類,然後繼承實現一個JoinGroup類:

public class ConnectionWatcher implements Watcher {

private static final int SESSION_TIMEOUT = 5000;

protected ZooKeeper zk;

private CountDownLatch connectedSignal = new CountDownLatch(1);

public void connect(String hosts) throws IOException, InterruptedException {

zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);

connectedSignal.await();

}

@Override

public void process(WatchedEvent event) {

if (event.getState() == KeeperState.SyncConnected) {

connectedSignal.countDown();

}

}

public void close() throws InterruptedException {

zk.close();

}

}

public class JoinGroup extends ConnectionWatcher {

public void join(String groupName, String memberName) throws KeeperException,

InterruptedException {

String path = "/" + groupName + "/" + memberName;

String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,

CreateMode.EPHEMERAL);

System.out.println("Created " + createdPath);

}

public static void main(String[] args) throws Exception {

JoinGroup joinGroup = new JoinGroup();

joinGroup.connect(args[0]);

joinGroup.join(args[1], args[2]);

// stay alive until process is killed or thread is interrupted

Thread.sleep(Long.MAX_VALUE);

}

}

加入組與創建組非常相似。我們加入了一個ephemeral znode後,讓線程阻塞住。然後我們可以使用命令行查看zookeeper中我們創建的znode。當我們將阻塞的程序強行關閉後,我們會發現我們創建的znode會自動消失。

成員列表

下面我們實現一個程序來列出一個組中的所有成員。

public class ListGroup extends ConnectionWatcher {

public void list(String groupName) throws KeeperException,

InterruptedException {

String path = "/" + groupName;

try {

List<String> children = zk.getChildren(path, false);

if (children.isEmpty()) {

System.out.printf("No members in group %s\n", groupName);

System.exit(1);

}

for (String child : children) {

System.out.println(child);

}

} catch (KeeperException.NoNodeException e) {

System.out.printf("Group %s does not exist\n", groupName);

System.exit(1);

}

}

public static void main(String[] args) throws Exception {

ListGroup listGroup = new ListGroup();

listGroup.connect(args[0]);

listGroup.list(args[1]);

listGroup.close();

}

}

我們在list()方法中通過調用getChildren()方法來獲得某一個path下的子節點,然後打印出來。我們這裏會試著捕獲KeeperException.NoNodeException,當znode不存在時會拋出這個異常。我們運行程序,會看見如下結果,說明我們還沒在zoo組中添加任何成員幾點:

% java ListGroup localhost zoo

No members in group zoo

我們可以運行之前的JoinGroup來添加成員。在後臺運行一些JoinGroup程序,這些程序添加節點後都處於sleep狀態:

% java JoinGroup localhost zoo duck &

% java JoinGroup localhost zoo cow &

% java JoinGroup localhost zoo goat &

% goat_pid=$!

最後一行命令的作用是將最後一個啟動的java程序的pid記錄下來,我們好在列出zoo下面的成員後,將該進程kill掉。

下面我們將zoo下的成員打印出來:

% java ListGroup localhost zoo

goat

duck

cow

然後我們將kill掉最後啟動的JoinGroup客戶端:

% kill $goat_pid

過幾秒後,我們發現goat節點不見了。因為之前我們創建的goat節點是一個ephemeral節點,而創建這個節點的客戶端在ZooKeeper上的會話已經被終結了,因為這個回話在5秒後失效了(我們設置了會話的超時時間為5秒):

% java ListGroup localhost zoo

duck

cow

讓我們回過頭來看看,我們到底都做了一些什麽?我們首先創建了一個節點組,這些節點的創建者都在同一個分布式系統中。這些節點的創建者之間互相都不知情。一個創建者想使用這些節點數據進行一些工作,例如通過znode節點是否存在來判斷節點的創建者是否存在。

最後一點,我們不能只依靠組成員關系來完全解決在與節點通信時的網絡錯誤。當與一個集群組成員節點進行通信時,發生了通信失敗,我們需要使用重試或者試驗與組中其他的節點通信,來解決這次通信失敗。

Zookeeper的命令行工具

Zookeeper有一套命令行工具。我們可以像如下使用,來查找zoo下的成員節點:

% zkCli.sh -server localhost ls /zoo

[cow, duck]

你可以不加參數運行這個工具,來獲得幫助。

刪除分組

下面讓我們來看一下如何刪除一個分組?

ZooKeeper的API提供一個delete()方法來刪除一個znode。我們通過輸入znode的path和版本號(version number)來刪除想要刪除的znode。我們除了使用path來定位我們要刪除的znode,還需要一個參數是版本號。只有當我們指定要刪除的本版號,與znode當前的版本號一致時,ZooKeeper才允許我們將znode刪除掉。這是一種optimistic locking機制,用來處理znode的讀寫沖突。我們也可以忽略版本號一致檢查,做法就是版本號賦值為-1。

刪除一個znode之前,我們需要先刪除它的子節點,就下如下代碼中實現的那樣:

public class DeleteGroup extends ConnectionWatcher {

public void delete(String groupName) throws KeeperException,

InterruptedException {

String path = "/" + groupName;

try {

List<String> children = zk.getChildren(path, false);

for (String child : children) {

zk.delete(path + "/" + child, -1);

}

zk.delete(path, -1);

} catch (KeeperException.NoNodeException e) {

System.out.printf("Group %s does not exist\n", groupName);

System.exit(1);

}

}

public static void main(String[] args) throws Exception {

DeleteGroup deleteGroup = new DeleteGroup();

deleteGroup.connect(args[0]);

deleteGroup.delete(args[1]);

deleteGroup.close();

}

}

最後我們執行如下操作來刪除zoo group:

% java DeleteGroup localhost zoo

% java ListGroup localhost zoo

Group zoo does not exist

Zookeeper 服務

ZooKeeper 是一個高可用的高性能調度服務。這一節我們將講述他的模型、操作和接口。

數據模型 Data Model

ZooKeeper包含一個樹形的數據模型,我們叫做znode。一個znode中包含了存儲的數據和ACL(Access Control List)。ZooKeeper的設計適合存儲少量的數據,並不適合存儲大量數據,所以znode的存儲限制最大不超過1M。

數據的訪問被定義成原子性的。什麽是原子性呢?一個客戶端訪問一個znode時,不會只得到一部分數據;客戶端訪問數據要麽獲得全部數據,要麽讀取失敗,什麽也得不到。相似的,寫操作時,要麽寫入全部數據,要麽寫入失敗,什麽也寫不進去。ZooKeeper能夠保證寫操作只有兩個結果,成功和失敗。絕對不會出現只寫入了一部分數據的情況。與HDFS不同,ZooKeeper不支持字符的append(連接)操作。原因是HDFS是被設計成支持數據流訪問(streaming data access)的大數據存儲,而ZooKeeper則不是。

我們可以通過path來定位znode,就像Unix系統定位文件一樣,使用斜杠來表示路徑。但是,znode的路徑只能使用絕對路徑,而不能想Unix系統一樣使用相對路徑,即Zookeeper不能識別../和./這樣的路徑。

節點的名稱是由Unicode字符組成的,除了zookeeper這個字符串,我們可以任意命名節點。為什麽不能使用zookeeper命名節點呢?因為ZooKeeper已經默認使用zookeeper來命名了一個根節點,用來存儲一些管理數據。

請註意,這裏的path並不是URIs,在Java API中是一個String類型的變量。

Ephemeral znodes

我們已經知道,znode有兩種類型:ephemeral和persistent。在創建znode時,我們指定znode的類型,並且在之後不會再被修改。當創建znode的客戶端的session結束後,ephemeral類型的znode將被刪除。persistent類型的znode在創建以後,就與客戶端沒什麽聯系了,除非主動去刪除它,否則他會一直存在。Ephemeral znode沒有任何子節點。

雖然Ephemeral znode沒有綁定到客戶端的session,但是任何一個客戶端都可以訪問它,當然是在他們的ACL策略下允許訪問的情況下。我們在創建分布式系統時,需要知道分布式資源是否可用。Ephemeral znode就是為這種場景應運而生的。正如我們之前講述的例子中,使用Ephemeral znode來實現一個成員關系管理,任何一個客戶端進程任何時候都可以知道其他成員是否可用。

Znode的序號

如果在創建znode時,我們使用排序標誌的話,ZooKeeper會在我們指定的znode名字後面增加一個數字。我們繼續加入相同名字的znode時,這個數字會不斷增加。這個序號的計數器是由這些排序znode的父節點來維護的。

如果我們請求創建一個znode,指定命名為/a/b-,那麽ZooKeeper會為我們創建一個名字為/a/b-3的znode。我們再請求創建一個名字為/a/b-的znode,ZooKeeper會為我們創建一個名字/a/b-5的znode。ZooKeeper給我們指定的序號是不斷增長的。Java API中的create()的返回結果就是znode的實際名字。

那麽序號用來幹什麽呢?當然是用來排序用的!後面《A Lock Service》中我們將講述如何使用znode的序號來構建一個share lock。

觀察模式 Watches

觀察模式可以使客戶端在某一個znode發生變化時得到通知。觀察模式有ZooKeeper服務的某些操作啟動,並由其他的一些操作來觸發。例如,一個客戶端對一個znode進行了exists操作,來判斷目標znode是否存在,同時在znode上開啟了觀察模式。如果znode不存在,這exists將返回false。如果稍後,另外一個客戶端創建了這個znode,觀察模式將被觸發,將znode的創建事件通知之前開啟觀察模式的客戶端。我們將在以後詳細介紹其他的操作和觸發。

觀察模式只能被觸發一次。如果要一直獲得znode的創建和刪除的通知,那麽就需要不斷的在znode上開啟觀察模式。在上面的例子中,如果客戶端還繼續需要獲得znode被刪除的通知,那麽在獲得創建通知後,客戶端還需要繼續對這個znode進行exists操作,再開啟一次觀察模式。

在《A Configuration Service》中,有一個例子將講述如何使用觀察模式在集群中更新配置。

操作 Operations

下面的表格中列出了9種ZooKeeper的操作。

技術分享

調用delete和setData操作時,我們必須指定一個znode版本號(version number),即我們必須指定我們要刪除或者更新znode數據的哪個版本。如果版本號不匹配,操作將會失敗。失敗的原因可能是在我們提交之前,該znode已經被修改過了,版本號發生了增量變化。那麽我們該怎麽辦呢?我可以考慮重試,或者調用其他的操作。例如,我們提交更新失敗後,可以重新獲取znode當前的數據,看看當前的版本號是什麽,再做更新操作。

ZooKeeper雖然可以被看作是一個文件系統,但是由於ZooKeeper文件很小,所以沒有提供像一般文件系統所提供的open、close或者seek操作。

技術分享

批量更新 Multiupdate

ZooKeeper支持將一些原始的操作組合成一個操作單元,然後執行這些操作。那麽這種批量操作也是具有原子性的,只可能有兩種執行結果,成功和失敗。批量操作單元中的操作,不會出現一些操作執行成功,一些操作執行失敗的情況,即要麽都成功,要麽都失敗。

Multiupdate對於綁定一些結構化的全局變量很有用處。例如綁定一個無向圖(undirected graph)。無向圖的頂點(vertex)由znode來表示。添加和刪除邊(edge)的操作,由修改邊的兩個關聯znode來實現。如果我們使用ZooKeeper的原始的操作來實現對邊(edge)的操作,那麽就有可能產生兩個znode修改不一致的情況(一個修改成功,一個修改失敗)。那麽我們將修改兩個znode的操作放入到一個Multi修改單元中,就能夠保證兩個znode,要麽都修改成功,要麽都修改失敗。這樣就能夠避免修改無向圖的邊時產生修改不一致的現象。

APIs

ZooKeeper客戶端使用的核心編程語言有JAVA和C;同時也支持Perl、Python和REST。執行操作的方式呢,分為同步執行和異步執行。我們之前已經見識過了同步的Java API中的exists。

public Stat exists(String path, Watcher watcher) throws KeeperException,

InterruptedException

下面代碼則是異步方式的exists:

public void exists(String path, Watcher watcher, StatCallback cb, Object ctx)

Java API中,異步的方法的返回類型都是void,而操作的返回的結果將傳遞到回調對象的回調函數中。回調對象將實現StatCallback接口中的一個回調函數,來接收操作返回的結果。函數接口如下:

public void processResult(int rc, String path, Object ctx, Stat stat);

參數rc表示返回碼,請參考KeeperException中的定義。在stat參數為null的情況下,非0的值表示一種異常。參數path和ctx與客戶端調用的exists方法中的參數相等,這兩個參數通常用來確定回調中獲得的響應是來至於哪個請求的。參數ctx可以是任意對象,只有當path參數不能消滅請求的歧義時才會用到。如果不需要參數ctx,可以設置為null。

技術分享

觀察模式觸發器 Watch triggers

讀操作,例如:exists、getChildren、getData會在znode上開啟觀察模式,並且寫操作會觸發觀察模式事件,例如:create、delete和setData。ACL(Access Control List)操作不會啟動觀察模式。觀察模式被觸發時,會生成一個事件,這個事件的類型取決於觸發他的操作:

  • exists啟動的觀察模式,由創建znode,刪除znode和更新znode操作來觸發。

  • getData啟動的觀察模式,由刪除znode和更新znode操作觸發。創建znode不會觸發,是因為getData操作成功的前提是znode必須已經存在。

  • getChildren啟動的觀察模式,由子節點創建和刪除,或者本節點被刪除時才會被觸發。我們可以通過事件的類型來判斷是本節點被刪除還是子節點被刪除:NodeChildrenChanged表示子節點被刪除,而NodeDeleted表示本節點刪除。

技術分享

事件包含了觸發事件的znode的path,所以我們通過NodeCreated和NodeDeleted事件就可以知道哪個znode被創建了或者刪除了。如果我們需要在NodeChildrenChanged事件發生後知道哪個子節點被改變了,我們就需要再調用一次getChildren來獲得一個新的子節點列表。與之類似,在NodeDataChanged事件發生後,我們需要調用getData來獲得新的數據。我們在編寫程序時,會在接收到事件通知後改變znode的狀態,所以我們一定要清楚的記住znode的狀態變化。

ACLs 訪問控制操作

znode的創建時,我們會給他一個ACL(Access Control List),來決定誰可以對znode做哪些操作。

ZooKeeper通過鑒權來獲得客戶端的身份,然後通過ACL來控制客戶端的訪問。鑒權方式有如下幾種:

  • digest

    使用用戶名和密碼方式

  • sasl

    使用Kerberos鑒權

  • ip

    使用客戶端的IP來鑒權

客戶端可以在與ZooKeeper建立會話連接後,自己給自己授權。授權是並不是必須的,雖然znode的ACL要求客戶端必須是身份合法的,在這種情況下,客戶端可以自己授權來訪問znode。下面的例子,客戶端使用用戶名和密碼為自己授權:

zk.addAuthInfo("digest", "tom:secret".getBytes());

ACL是由鑒權方式、鑒權方式的ID和一個許可(permession)的集合組成。例如,我們想通過一個ip地址為10.0.0.1的客戶端訪問一個znode。那麽,我們需要為znode設置一個ACL,鑒權方式使用IP鑒權方式,鑒權方式的ID為10.0.0.1,只允許讀權限。使用JAVA我們將像如下方式創建一個ACL對象:

new ACL(Perms.READ,new Id("ip", "10.0.0.1"));

所有的許可權限將在下表中列出。請註意,exists操作不受ACL的控制,所以任何一個客戶端都可以通過exists操作來獲得任何znode的狀態,從而得知znode是否真的存在。

在ZooDefs.Ids類中,有一些ACL的預定義變量,包括OPEN_ACL_UNSAFE,這個設置表示將賦予所有的許可給客戶端(除了ADMIN的許可)。

另外,我們可以使用ZooKeeper鑒權的插件機制,來整合第三方的鑒權系統。

Zookeeper 快速入門(上)