1. 程式人生 > >ZooKeeper服務-操作(API、集合更新、觀察者、ACL)

ZooKeeper服務-操作(API、集合更新、觀察者、ACL)

操作

create:建立一個znode(必須要有父節點)
delete:刪除一個znode(該znode不能有任何子節點)
exists:測試一個znode是否存在並且查詢它的元資料
getACL,setACL:獲取/設定一個znode的ACL
getChildren:獲取一個znode的子節點列表
getData,setData:獲取/設定一個znode所儲存的資料
sync:將客戶端的znode檢視與ZooKeeper同步

ZooKeeper中的更新操作是有條件的。在使用delete或setData操作時必須提供被更新znode的版本號(可以通過exists操作獲得)。如果版本號不匹配,則更新操作會失敗。更新操作是非阻塞操作,因此一個更新失敗的客戶端(由於其他程序同時在更新同一個znode)可以決定是否重試,或執行其他操作,並不會因此而阻塞其他程序的執行。
雖然ZooKeeper可以被看作是一個檔案系統,但出於簡單性的需要,有一些檔案系統的基本操作被它摒棄了。由於ZooKeeper中的檔案較小並且總是被整體讀寫,因此沒有必要提供開啟、關閉或查詢操作。
Sync操作與POSIX檔案系統中的fsync()操作是不同的。如前所述,ZooKeeper中的寫操作具有原子性,一個成功的寫操作會保證將資料寫到ZooKeeper伺服器的持久儲存介質中。然而,ZooKeeper允許客戶端讀到的資料滯後於ZooKeeper服務的最新狀態,因此客戶端可以使用sync操作來獲取資料的最新狀態。

1. 集合更新(Multiupdate)

ZooKeeper中有一個被稱為multi的操作,用於將多個基本操作集合成一個操作單元,並確保這些基本操作同時被成功執行,或者同時失敗,不會發生其中部分基本操作被成功執行而其他基本操作失敗的情況。
集合更新可以被用於在ZooKeeper中構建需要保持全域性一致性的資料結構,例如構建一個無向圖。在ZooKeeper中用一個znode來表示無向圖中的一個頂點,為了在兩個頂點之間新增或刪除一條邊,我們需要同時更新兩個頂點所分別對應的兩個znode,因為每個znode中都有指向對方的引用。如果我們只用ZooKeeper的基本操作來實現邊的更新,可能會讓其他客戶端發現無向圖處於不一致的狀態,即一個頂點具有指向另一個頂點的引用而對方卻沒有對應的應用。將針對兩個znode的更新操作集合到一個multi操作中可以保證這組更新操作的原子性,也就保證了一對頂點之間不會出現不完整的連線。

2. 關於API

對於ZooKeeper客戶端來說,主要有兩種語言繫結(binding)可以使用:Java和C;當然也可以使用Perl、Python和REST的contrib繫結。對於每一種繫結語言來說,在執行操作時都可以選擇同步執行或非同步執行。看exists的簽名
public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
它返回一個封裝有znode元資料的Stat物件(如果znode不存在,則返回null)
非同步執行的簽名如下

public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)

因為所有操作的結果都是通過回撥來傳送的,因此在Java API中非同步方法的放回型別都是void。呼叫者傳遞一個回撥的實現,當ZooKeeper相應時,該回調方法被呼叫。這種情況下,回撥採用StatCallback介面,它有以下方法:

/**
* 回撥
* @param rc	返回程式碼,對應於KeeperException的程式碼。每個非零程式碼都代表一個異常
* @param path	對應於客戶端傳遞給exists方法的引數,用於識別這個回撥所相應的請求。當path引數不能提供足夠的資訊時,客戶端可以通過ctx引數來區分不同請求。如果path引數提供了足夠的資訊,ctx可以設為null
* @param ctx	對應於客戶端傳遞給exists方法的引數,用於識別這個回撥所相應的請求。可以是任意物件
* @param stat	這種情況下,stat引數是null
*/
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc:" + rc);
System.out.println("path:" + path);
System.out.println("ctx:" + ctx);
}

非同步API允許以流水線方式處理請求,這在某些情況下可以提供更好的吞吐量。

3. 觀察觸發器

在exists、getChildren和getData這些讀操作上可以設定觀察,這些觀察可以被寫操作create、delete和setData觸發。ACL相關的操作不參與觸發任何觀察。當一個觀察被觸發時會產生一個觀察事件,這個觀察和觸發它的操作共同決定著觀察事件的型別。
當做觀察的znode被建立、刪除或其資料被更新時,設定在exists操作上的觀察將被觸發。
當所觀察的znode被刪除或其資料被更新時,設定在getData操作上的觀察將被觸發。建立znode不會觸發getData操作上的觀察,因為getData操作成功執行的前提是znode必須已經存在。
當所觀察的znode的一個子節點被建立或刪除時,或所觀察的znode自己被刪除時,設定在getChildren操作上的觀察將會被觸發。可以通過觀察時間的型別來判斷被刪除的是znode還是其子節點:NodeDelete型別代表znode被刪除;NodeChildrenChanged型別代表一個子節點被刪除。

一個觀察事件中包含涉及該事件的znode的路徑,因此對於NodeCreated和NodeDeleted事件來說,可通過路徑來判斷哪一個節點被建立或刪除。為了能夠在NodeChildrenChanged事件發生之後判斷是哪些子節點被修改,需要重新呼叫getChildren來獲取新的子節點列表。與之類似,為了能夠在NodeDataChanged事件之後獲取新的資料,需要呼叫getData。

測試

package com.zhen.zookeeper.existsAndWatcher;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * @author FengZhen
 * @date 2018年10月13日
 * exists與觀察者
 *	state=-112 會話超時狀態
	state= -113 認證失敗狀態
	state=  1 連線建立中
	state= 2 (暫時不清楚如何理解這個狀態,ZOO_ASSOCIATING_STATE)
	state=3 連線已建立狀態
	state= 999 無連線狀態
	
	
	type=1 建立節點事件
	type=2 刪除節點事件
	type=3 更改節點事件
	type=4 子節點列表變化事件
	type= -1 會話session事件
	type=-2 監控被移除事件

 */
public class ExistsAndWatcher implements Watcher{

	private static final int SESSION_TIMEOUT = 5000;
	
	private ZooKeeper zk;
	private CountDownLatch connectedSignal = new CountDownLatch(1);
	
	public void connect(String hosts) throws IOException, InterruptedException {
		/**
		 * hosts:ZooKeeper服務的主機地址(可指定埠,預設是2181)
		 * SESSION_TIMEOUT:以毫秒為單位的會話超時引數(此處為5秒)
		 * this:Watcher物件的例項。Watcher物件接收來自於ZooKeeper的回撥,以獲得各種事件的通知。
		 */
		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
		connectedSignal.await();
	}
	
	/**
	 * 當客戶端已經與ZK建立連線後,Watcher的process方法會被呼叫
	 * 引數是一個用於表示該連線的事件。
	 */
	public void process(WatchedEvent event) {
		int type = event.getType().getIntValue();
		System.out.println("watchedEven--" + event.getState().getIntValue() + " : " + type);
		//連線事件
		if (event.getState() == KeeperState.SyncConnected) {
			/**
			 * 通過呼叫CountDownLatch的countDown方法來遞減它的計數器。
			 * 鎖存器(latch)被建立時帶有一個值為1的計數器,用於表示在它釋放所有等待執行緒之前需要發生的事件數。
			 * 在呼叫一次countDown方法之後,計數器的值變為0,則await方法返回。
			 */
			connectedSignal.countDown();
		}
		//如果為建立或者刪除znode的話,需要再新增一個觀察者,觀察後續操作
		if (type == 1 || type == 2) {
			existsAndWatcher("/zoo");
		}
	}

	public void create(String groupName) throws KeeperException, InterruptedException {
		String path = "/" + groupName;
		/**
		 * 用ZK的create方法建立一個新的ZK的znode
		 * path:路徑(用字串表示)
		 * null:znode的內容(位元組陣列,此處為空值)
		 * Ids.OPEN_ACL_UNSAFE:訪問控制列表(簡稱ACL,此處為完全開放的ACL,允許任何客戶端對znode進行讀寫)
		 * CreateMode.PERSISTENT:znode型別
		 * znode型別可以分為兩種:1.短暫的(ephemeral)	2.持久的(persistent)
		 * 建立znode的客戶端斷開連線時,無論客戶端是明確斷開還是因為任何原因而終止,短暫znode都會被ZK服務刪除。持久znode不會被刪除。
		 * create方法的返回值是ZK所建立的節點路徑
		 */
		String createdPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		System.out.println("Created " + createdPath);
	}
	
	public void close() throws InterruptedException {
		zk.close();
	}
	
	public boolean exists(String path) throws KeeperException, InterruptedException {
		Stat exists = zk.exists(path, true);
		return null != exists;
	}
	
	public void existsAndWatcher(String path) {
		zk.exists(path, this, new StatCallback() {
			/**
			 * 回撥
			 * @param rc	返回程式碼,對應於KeeperException的程式碼。每個非零程式碼都代表一個異常
			 * @param path	對應於客戶端傳遞給exists方法的引數,用於識別這個回撥所相應的請求。當path引數不能提供足夠的資訊時,客戶端可以通過ctx引數來區分不同請求。如果path引數提供了足夠的資訊,ctx可以設為null
			 * @param ctx	對應於客戶端傳遞給exists方法的引數,用於識別這個回撥所相應的請求。可以是任意物件
			 * @param stat	這種情況下,stat引數是null
			 */
			public void processResult(int rc, String path, Object ctx, Stat stat) {
				System.out.println("rc:" + rc);
				System.out.println("path:" + path);
				System.out.println("ctx:" + ctx);
			}
		}, "標記回撥所相應的請求");
	}
	
	public void delete(String groupName) {
		String path = "/" + groupName;
		try {
			List<String> children = zk.getChildren(path, false);
			for (String child : children) {
				zk.delete(path + "/" + child, -1);
			}
			/**
			 * delete方法有兩個引數
			 * path:節點路徑
			 * -1:版本號
			 * 如果所提供的版本號與znode的版本號一致,ZK會刪除這個znode。
			 * 這是一種樂觀的加鎖機制,使客戶端能夠檢測出對znode的修改衝突。
			 * 通過將版本號設定為-1,可以繞過這個版本檢測機制,不管znode的版本號是什麼而直接將其刪除。
			 * ZK不支援遞迴刪除,因此在刪除父節點之前必須先刪除子節點
			 */
			zk.delete(path, -1);
		} catch (KeeperException e) {
			System.out.printf("Group %s does not exist\n", groupName);
			System.exit(1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
		String hosts = "localhost:2181";
		String groupName = "zoo";
		ExistsAndWatcher existsAndWatcher = new ExistsAndWatcher();
		existsAndWatcher.connect(hosts);
		//同步
		boolean exists = existsAndWatcher.exists("/zoo");
		System.out.println("exists zoo:" + exists);
		//非同步
		existsAndWatcher.existsAndWatcher("/zoo");
		existsAndWatcher.create(groupName);
		existsAndWatcher.delete(groupName);
	
		existsAndWatcher.close();
	}
	
}

  

4. ACL列表

每個znode被建立時都會帶有一個ACL列表,用於決定誰可以對它執行何種操作。
ACL依賴於ZooKeeper的客戶端身份驗證機制。ZooKeeper提供了以下幾種身份驗證方式
 Digest:通過使用者名稱和密碼來識別客戶端
 Sasl:通過Kerberos來識別客戶端
 Ip:通過客戶端的IP地址來識別客戶端

在建立一個ZooKeeper會話之後,客戶端可以對自己進行身份驗證。雖然znode的ACL列表會要求所有的客戶端是經過驗證的,但ZooKeeper的身份驗證過程卻是可選的,客戶端必須自己進行身份驗證來支援對znode的訪問。
使用digest方式進行身份驗證的例子
zk.addAuthInfo("digest", "fz:secret".getBytes());
每個ACL都是身份驗證方式、符合該方式的一個身份和一組許可權的組合。例如,如果打算給IP地址為10.0.0.1的客戶端對某個znode的讀許可權,可以使用IP驗證方式、10.0.0.1和READ許可權在該znode上設定一個ACL。

測試

package com.zhen.zookeeper.ACL;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Perms;


/**
 * @author FengZhen
 * @date 2018年11月25日
 * ACL
 */
public class ACLTest implements Watcher{

private static final int SESSION_TIMEOUT = 5000;
	
	private ZooKeeper zk;
	private CountDownLatch connectedSignal = new CountDownLatch(1);
	
	public void connect(String hosts) throws IOException, InterruptedException {
		/**
		 * hosts:ZooKeeper服務的主機地址(可指定埠,預設是2181)
		 * SESSION_TIMEOUT:以毫秒為單位的會話超時引數(此處為5秒)
		 * this:Watcher物件的例項。Watcher物件接收來自於ZooKeeper的回撥,以獲得各種事件的通知。
		 */
		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
		//根據IP
		zk.addAuthInfo("ip", "192.168.1.103".getBytes());
		//根據使用者密碼
		zk.addAuthInfo("digest", "fz:123456".getBytes());
		connectedSignal.await();
	}
	
	/**
	 * 當客戶端已經與ZK建立連線後,Watcher的process方法會被呼叫
	 * 引數是一個用於表示該連線的事件。
	 */
	public void process(WatchedEvent event) {
		//連線事件
		if (event.getState() == KeeperState.SyncConnected) {
			/**
			 * 通過呼叫CountDownLatch的countDown方法來遞減它的計數器。
			 * 鎖存器(latch)被建立時帶有一個值為1的計數器,用於表示在它釋放所有等待執行緒之前需要發生的事件數。
			 * 在呼叫一次countDown方法之後,計數器的值變為0,則await方法返回。
			 */
			connectedSignal.countDown();
		}
	}

	public void createACLIP(String groupName) throws KeeperException, InterruptedException {
		String path = "/" + groupName;
		/**
		 * 用ZK的create方法建立一個新的ZK的znode
		 * path:路徑(用字串表示)
		 * null:znode的內容(位元組陣列,此處為空值)
		 * Ids.OPEN_ACL_UNSAFE:訪問控制列表(簡稱ACL,此處為完全開放的ACL,允許任何客戶端對znode進行讀寫)
		 * CreateMode.PERSISTENT:znode型別
		 * znode型別可以分為兩種:1.短暫的(ephemeral)	2.持久的(persistent)
		 * 建立znode的客戶端斷開連線時,無論客戶端是明確斷開還是因為任何原因而終止,短暫znode都會被ZK服務刪除。持久znode不會被刪除。
		 * create方法的返回值是ZK所建立的節點路徑
		 */
		//新增許可權,設定IP
		ACL aclIP = new ACL(Perms.ALL, new Id("ip",  "192.168.1.103"));
		System.out.println(aclIP);
		List<ACL> acls = new ArrayList<ACL>();
		acls.add(aclIP);
		String createdPath = zk.create(path, null, acls, CreateMode.PERSISTENT);
		System.out.println("Created " + createdPath);
	}
	
	public void createACLDigest(String groupName) throws KeeperException, InterruptedException, NoSuchAlgorithmException {
		String path = "/" + groupName;
		/**
		 * 用ZK的create方法建立一個新的ZK的znode
		 * path:路徑(用字串表示)
		 * null:znode的內容(位元組陣列,此處為空值)
		 * Ids.OPEN_ACL_UNSAFE:訪問控制列表(簡稱ACL,此處為完全開放的ACL,允許任何客戶端對znode進行讀寫)
		 * CreateMode.PERSISTENT:znode型別
		 * znode型別可以分為兩種:1.短暫的(ephemeral)	2.持久的(persistent)
		 * 建立znode的客戶端斷開連線時,無論客戶端是明確斷開還是因為任何原因而終止,短暫znode都會被ZK服務刪除。持久znode不會被刪除。
		 * create方法的返回值是ZK所建立的節點路徑
		 */
		//新增許可權,設定IP
		ACL aclIP = new ACL(Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("fz:123456")));
		System.out.println(aclIP);
		List<ACL> acls = new ArrayList<ACL>();
		acls.add(aclIP);
		String createdPath = zk.create(path, null, acls, CreateMode.PERSISTENT);
		System.out.println("Created " + createdPath);
	}
	
	public void close() throws InterruptedException {
		zk.close();
	}
	
	public void writeZnodeACLDigest(String groupName) throws KeeperException, InterruptedException {
		String path = "/" + groupName;
		zk.setData(path, "test_digest_data".getBytes(), 0);
	}
	
	public void writeZnodeACLIP(String groupName) throws KeeperException, InterruptedException {
		String path = "/" + groupName;
		zk.setData(path, "test_ip_data".getBytes(), 0);
	}
	
	public void readZnode(String groupName) throws KeeperException, InterruptedException {
		String path = "/" + groupName;
		String data = zk.getData(path, false, null).toString();
		System.out.println("data = " + data);
	}
	
	public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
		String hosts = "192.168.1.103:2181";
		String groupNameIP = "znode_acl_test_ip";
		String groupNameDigest = "znode_acl_test_digest";
		ACLTest aclTest = new ACLTest();
		aclTest.connect(hosts);
		//digest
//		aclTest.createACLDigest(groupNameDigest);
//		aclTest.writeZnodeACLDigest(groupNameDigest);
		aclTest.readZnode(groupNameDigest);
		
		//IP
//		aclTest.createACLIP(groupNameIP);
//		aclTest.writeZnodeACLIP(groupNameIP);
//		aclTest.readZnode(groupNameIP);
		
		aclTest.close();
		
		//建立完帶有ACL的znode之後,檢視ACL
		//[zk: localhost:2181(CONNECTED) 7] getAcl /znode_acl_test_ip
		//'ip,'192.168.1.103
		//: cdrwa
		
		//不設定IP直接讀取該znode內容,報錯如下
		//Exception in thread "main" org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /znode_acl_test_ip
		
	}
	
	
}