ZooKeeper服務-操作(API、集合更新、觀察者、ACL)
操作
create:建立一個znode(必須要有父節點)
delete:刪除一個znode(該znode不能有任何子節點)
exists:測試一個znode是否存在並且查詢它的元資料
getACL,setACL:獲取/設定一個znode的ACL
getChildren:獲取一個znode的子節點列表
getData,setData:獲取/設定一個znode所儲存的資料
sync:將客戶端的znode檢視與ZooKeeper同步
ZooKeeper中的更新操作是有條件的。在使用delete或setData操作時必須提供被更新znode的版本號(可以通過exists操作獲得)。如果版本號不匹配,則更新操作會失敗。更新操作是非阻塞操作,因此一個更新失敗的客戶端(由於其他程序同時在更新同一個znode)可以決定是否重試,或執行其他操作,並不會因此而阻塞其他程序的執行。
雖然ZooKeeper可以被看作是一個檔案系統,但出於簡單性的需要,有一些檔案系統的基本操作被它摒棄了。由於ZooKeeper中的檔案較小並且總是被整體讀寫,因此沒有必要提供開啟、關閉或查詢操作。
Sync操作與POSIX檔案系統中的fsync()操作是不同的。如前所述,ZooKeeper中的寫操作具有原子性,一個成功的寫操作會保證將資料寫到ZooKeeper伺服器的持久儲存介質中。然而,ZooKeeper允許客戶端讀到的資料滯後於ZooKeeper服務的最新狀態,因此客戶端可以使用sync操作來獲取資料的最新狀態。
1. 集合更新(Multiupdate)
ZooKeeper中有一個被稱為multi的操作,用於將多個基本操作集合成一個操作單元,並確保這些基本操作同時被成功執行,或者同時失敗,不會發生其中部分基本操作被成功執行而其他基本操作失敗的情況。
集合更新可以被用於在ZooKeeper中構建需要保持全域性一致性的資料結構,例如構建一個無向圖。在ZooKeeper中用一個znode來表示無向圖中的一個頂點,為了在兩個頂點之間新增或刪除一條邊,我們需要同時更新兩個頂點所分別對應的兩個znode,因為每個znode中都有指向對方的引用。如果我們只用ZooKeeper的基本操作來實現邊的更新,可能會讓其他客戶端發現無向圖處於不一致的狀態,即一個頂點具有指向另一個頂點的引用而對方卻沒有對應的應用。將針對兩個znode的更新操作集合到一個multi操作中可以保證這組更新操作的原子性,也就保證了一對頂點之間不會出現不完整的連線。
2. 關於API
對於ZooKeeper客戶端來說,主要有兩種語言繫結(binding)可以使用:Java和C;當然也可以使用Perl、Python和REST的contrib繫結。對於每一種繫結語言來說,在執行操作時都可以選擇同步執行或非同步執行。看exists的簽名
public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
它返回一個封裝有znode元資料的Stat物件(如果znode不存在,則返回null)
非同步執行的簽名如下
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
因為所有操作的結果都是通過回撥來傳送的,因此在Java API中非同步方法的放回型別都是void。呼叫者傳遞一個回撥的實現,當ZooKeeper相應時,該回調方法被呼叫。這種情況下,回撥採用StatCallback介面,它有以下方法:
/** * 回撥 * @param rc 返回程式碼,對應於KeeperException的程式碼。每個非零程式碼都代表一個異常 * @param path 對應於客戶端傳遞給exists方法的引數,用於識別這個回撥所相應的請求。當path引數不能提供足夠的資訊時,客戶端可以通過ctx引數來區分不同請求。如果path引數提供了足夠的資訊,ctx可以設為null * @param ctx 對應於客戶端傳遞給exists方法的引數,用於識別這個回撥所相應的請求。可以是任意物件 * @param stat 這種情況下,stat引數是null */ public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc:" + rc); System.out.println("path:" + path); System.out.println("ctx:" + ctx); }
非同步API允許以流水線方式處理請求,這在某些情況下可以提供更好的吞吐量。
3. 觀察觸發器
在exists、getChildren和getData這些讀操作上可以設定觀察,這些觀察可以被寫操作create、delete和setData觸發。ACL相關的操作不參與觸發任何觀察。當一個觀察被觸發時會產生一個觀察事件,這個觀察和觸發它的操作共同決定著觀察事件的型別。
當做觀察的znode被建立、刪除或其資料被更新時,設定在exists操作上的觀察將被觸發。
當所觀察的znode被刪除或其資料被更新時,設定在getData操作上的觀察將被觸發。建立znode不會觸發getData操作上的觀察,因為getData操作成功執行的前提是znode必須已經存在。
當所觀察的znode的一個子節點被建立或刪除時,或所觀察的znode自己被刪除時,設定在getChildren操作上的觀察將會被觸發。可以通過觀察時間的型別來判斷被刪除的是znode還是其子節點:NodeDelete型別代表znode被刪除;NodeChildrenChanged型別代表一個子節點被刪除。
一個觀察事件中包含涉及該事件的znode的路徑,因此對於NodeCreated和NodeDeleted事件來說,可通過路徑來判斷哪一個節點被建立或刪除。為了能夠在NodeChildrenChanged事件發生之後判斷是哪些子節點被修改,需要重新呼叫getChildren來獲取新的子節點列表。與之類似,為了能夠在NodeDataChanged事件之後獲取新的資料,需要呼叫getData。
測試
package com.zhen.zookeeper.existsAndWatcher; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * @author FengZhen * @date 2018年10月13日 * exists與觀察者 * state=-112 會話超時狀態 state= -113 認證失敗狀態 state= 1 連線建立中 state= 2 (暫時不清楚如何理解這個狀態,ZOO_ASSOCIATING_STATE) state=3 連線已建立狀態 state= 999 無連線狀態 type=1 建立節點事件 type=2 刪除節點事件 type=3 更改節點事件 type=4 子節點列表變化事件 type= -1 會話session事件 type=-2 監控被移除事件 */ public class ExistsAndWatcher implements Watcher{ private static final int SESSION_TIMEOUT = 5000; private ZooKeeper zk; private CountDownLatch connectedSignal = new CountDownLatch(1); public void connect(String hosts) throws IOException, InterruptedException { /** * hosts:ZooKeeper服務的主機地址(可指定埠,預設是2181) * SESSION_TIMEOUT:以毫秒為單位的會話超時引數(此處為5秒) * this:Watcher物件的例項。Watcher物件接收來自於ZooKeeper的回撥,以獲得各種事件的通知。 */ zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); connectedSignal.await(); } /** * 當客戶端已經與ZK建立連線後,Watcher的process方法會被呼叫 * 引數是一個用於表示該連線的事件。 */ public void process(WatchedEvent event) { int type = event.getType().getIntValue(); System.out.println("watchedEven--" + event.getState().getIntValue() + " : " + type); //連線事件 if (event.getState() == KeeperState.SyncConnected) { /** * 通過呼叫CountDownLatch的countDown方法來遞減它的計數器。 * 鎖存器(latch)被建立時帶有一個值為1的計數器,用於表示在它釋放所有等待執行緒之前需要發生的事件數。 * 在呼叫一次countDown方法之後,計數器的值變為0,則await方法返回。 */ connectedSignal.countDown(); } //如果為建立或者刪除znode的話,需要再新增一個觀察者,觀察後續操作 if (type == 1 || type == 2) { existsAndWatcher("/zoo"); } } public void create(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName; /** * 用ZK的create方法建立一個新的ZK的znode * path:路徑(用字串表示) * null:znode的內容(位元組陣列,此處為空值) * Ids.OPEN_ACL_UNSAFE:訪問控制列表(簡稱ACL,此處為完全開放的ACL,允許任何客戶端對znode進行讀寫) * CreateMode.PERSISTENT:znode型別 * znode型別可以分為兩種:1.短暫的(ephemeral) 2.持久的(persistent) * 建立znode的客戶端斷開連線時,無論客戶端是明確斷開還是因為任何原因而終止,短暫znode都會被ZK服務刪除。持久znode不會被刪除。 * create方法的返回值是ZK所建立的節點路徑 */ String createdPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("Created " + createdPath); } public void close() throws InterruptedException { zk.close(); } public boolean exists(String path) throws KeeperException, InterruptedException { Stat exists = zk.exists(path, true); return null != exists; } public void existsAndWatcher(String path) { zk.exists(path, this, new StatCallback() { /** * 回撥 * @param rc 返回程式碼,對應於KeeperException的程式碼。每個非零程式碼都代表一個異常 * @param path 對應於客戶端傳遞給exists方法的引數,用於識別這個回撥所相應的請求。當path引數不能提供足夠的資訊時,客戶端可以通過ctx引數來區分不同請求。如果path引數提供了足夠的資訊,ctx可以設為null * @param ctx 對應於客戶端傳遞給exists方法的引數,用於識別這個回撥所相應的請求。可以是任意物件 * @param stat 這種情況下,stat引數是null */ public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc:" + rc); System.out.println("path:" + path); System.out.println("ctx:" + ctx); } }, "標記回撥所相應的請求"); } public void delete(String groupName) { String path = "/" + groupName; try { List<String> children = zk.getChildren(path, false); for (String child : children) { zk.delete(path + "/" + child, -1); } /** * delete方法有兩個引數 * path:節點路徑 * -1:版本號 * 如果所提供的版本號與znode的版本號一致,ZK會刪除這個znode。 * 這是一種樂觀的加鎖機制,使客戶端能夠檢測出對znode的修改衝突。 * 通過將版本號設定為-1,可以繞過這個版本檢測機制,不管znode的版本號是什麼而直接將其刪除。 * ZK不支援遞迴刪除,因此在刪除父節點之前必須先刪除子節點 */ zk.delete(path, -1); } catch (KeeperException e) { System.out.printf("Group %s does not exist\n", groupName); System.exit(1); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String hosts = "localhost:2181"; String groupName = "zoo"; ExistsAndWatcher existsAndWatcher = new ExistsAndWatcher(); existsAndWatcher.connect(hosts); //同步 boolean exists = existsAndWatcher.exists("/zoo"); System.out.println("exists zoo:" + exists); //非同步 existsAndWatcher.existsAndWatcher("/zoo"); existsAndWatcher.create(groupName); existsAndWatcher.delete(groupName); existsAndWatcher.close(); } }
4. ACL列表
每個znode被建立時都會帶有一個ACL列表,用於決定誰可以對它執行何種操作。
ACL依賴於ZooKeeper的客戶端身份驗證機制。ZooKeeper提供了以下幾種身份驗證方式
Digest:通過使用者名稱和密碼來識別客戶端
Sasl:通過Kerberos來識別客戶端
Ip:通過客戶端的IP地址來識別客戶端
在建立一個ZooKeeper會話之後,客戶端可以對自己進行身份驗證。雖然znode的ACL列表會要求所有的客戶端是經過驗證的,但ZooKeeper的身份驗證過程卻是可選的,客戶端必須自己進行身份驗證來支援對znode的訪問。
使用digest方式進行身份驗證的例子
zk.addAuthInfo("digest", "fz:secret".getBytes());
每個ACL都是身份驗證方式、符合該方式的一個身份和一組許可權的組合。例如,如果打算給IP地址為10.0.0.1的客戶端對某個znode的讀許可權,可以使用IP驗證方式、10.0.0.1和READ許可權在該znode上設定一個ACL。
測試
package com.zhen.zookeeper.ACL; import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Perms; /** * @author FengZhen * @date 2018年11月25日 * ACL */ public class ACLTest implements Watcher{ private static final int SESSION_TIMEOUT = 5000; private ZooKeeper zk; private CountDownLatch connectedSignal = new CountDownLatch(1); public void connect(String hosts) throws IOException, InterruptedException { /** * hosts:ZooKeeper服務的主機地址(可指定埠,預設是2181) * SESSION_TIMEOUT:以毫秒為單位的會話超時引數(此處為5秒) * this:Watcher物件的例項。Watcher物件接收來自於ZooKeeper的回撥,以獲得各種事件的通知。 */ zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); //根據IP zk.addAuthInfo("ip", "192.168.1.103".getBytes()); //根據使用者密碼 zk.addAuthInfo("digest", "fz:123456".getBytes()); connectedSignal.await(); } /** * 當客戶端已經與ZK建立連線後,Watcher的process方法會被呼叫 * 引數是一個用於表示該連線的事件。 */ public void process(WatchedEvent event) { //連線事件 if (event.getState() == KeeperState.SyncConnected) { /** * 通過呼叫CountDownLatch的countDown方法來遞減它的計數器。 * 鎖存器(latch)被建立時帶有一個值為1的計數器,用於表示在它釋放所有等待執行緒之前需要發生的事件數。 * 在呼叫一次countDown方法之後,計數器的值變為0,則await方法返回。 */ connectedSignal.countDown(); } } public void createACLIP(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName; /** * 用ZK的create方法建立一個新的ZK的znode * path:路徑(用字串表示) * null:znode的內容(位元組陣列,此處為空值) * Ids.OPEN_ACL_UNSAFE:訪問控制列表(簡稱ACL,此處為完全開放的ACL,允許任何客戶端對znode進行讀寫) * CreateMode.PERSISTENT:znode型別 * znode型別可以分為兩種:1.短暫的(ephemeral) 2.持久的(persistent) * 建立znode的客戶端斷開連線時,無論客戶端是明確斷開還是因為任何原因而終止,短暫znode都會被ZK服務刪除。持久znode不會被刪除。 * create方法的返回值是ZK所建立的節點路徑 */ //新增許可權,設定IP ACL aclIP = new ACL(Perms.ALL, new Id("ip", "192.168.1.103")); System.out.println(aclIP); List<ACL> acls = new ArrayList<ACL>(); acls.add(aclIP); String createdPath = zk.create(path, null, acls, CreateMode.PERSISTENT); System.out.println("Created " + createdPath); } public void createACLDigest(String groupName) throws KeeperException, InterruptedException, NoSuchAlgorithmException { String path = "/" + groupName; /** * 用ZK的create方法建立一個新的ZK的znode * path:路徑(用字串表示) * null:znode的內容(位元組陣列,此處為空值) * Ids.OPEN_ACL_UNSAFE:訪問控制列表(簡稱ACL,此處為完全開放的ACL,允許任何客戶端對znode進行讀寫) * CreateMode.PERSISTENT:znode型別 * znode型別可以分為兩種:1.短暫的(ephemeral) 2.持久的(persistent) * 建立znode的客戶端斷開連線時,無論客戶端是明確斷開還是因為任何原因而終止,短暫znode都會被ZK服務刪除。持久znode不會被刪除。 * create方法的返回值是ZK所建立的節點路徑 */ //新增許可權,設定IP ACL aclIP = new ACL(Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("fz:123456"))); System.out.println(aclIP); List<ACL> acls = new ArrayList<ACL>(); acls.add(aclIP); String createdPath = zk.create(path, null, acls, CreateMode.PERSISTENT); System.out.println("Created " + createdPath); } public void close() throws InterruptedException { zk.close(); } public void writeZnodeACLDigest(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName; zk.setData(path, "test_digest_data".getBytes(), 0); } public void writeZnodeACLIP(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName; zk.setData(path, "test_ip_data".getBytes(), 0); } public void readZnode(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName; String data = zk.getData(path, false, null).toString(); System.out.println("data = " + data); } public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { String hosts = "192.168.1.103:2181"; String groupNameIP = "znode_acl_test_ip"; String groupNameDigest = "znode_acl_test_digest"; ACLTest aclTest = new ACLTest(); aclTest.connect(hosts); //digest // aclTest.createACLDigest(groupNameDigest); // aclTest.writeZnodeACLDigest(groupNameDigest); aclTest.readZnode(groupNameDigest); //IP // aclTest.createACLIP(groupNameIP); // aclTest.writeZnodeACLIP(groupNameIP); // aclTest.readZnode(groupNameIP); aclTest.close(); //建立完帶有ACL的znode之後,檢視ACL //[zk: localhost:2181(CONNECTED) 7] getAcl /znode_acl_test_ip //'ip,'192.168.1.103 //: cdrwa //不設定IP直接讀取該znode內容,報錯如下 //Exception in thread "main" org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /znode_acl_test_ip } }