1. 程式人生 > >Zookeeper——2、使用Zookeeper原生API操作Zookeeper

Zookeeper——2、使用Zookeeper原生API操作Zookeeper

zookeeper的javaclient可以使我們更輕鬆的實現對zookeeper的各種操作,要使用java操作zookeeper,需要引入zookeeper-3.4.5.jar和zkclient-0.1.jar。zookeeper-3.4.5.jar是官方提供的JAVA API,zkclient-0.1.jar則是在原生API基礎上進行擴充套件的開源Java客戶端。(zookeeper文章所需的jar包

客戶端可以通過建立一個zookeeper例項來連線zookeeper伺服器,Zookeeper(Arguments)方法(一共有4個構造方法,根據引數不同),引數說明如下:

connectString:連線伺服器列表,用“,”分割。

sessionTimeout:心跳檢測時間週期,單位毫秒。

watcher:事件處理通知器。

canBeReadOnly:標識當前會話是否支援只讀。

sessionId和sessionPassword:提供連線zookeeper伺服器的sessionId和密碼,通過這兩個引數確定唯一一臺客戶端,目的是可以提供重複會話。

注意:zookeeper客戶端和伺服器會話的建立是一個非同步的過程,也就是說在程式中,程式方法在處理完客戶端初始化後立即返回(即程式繼續往下執行程式碼,這樣,在大多數情況下並沒有真正的構建好一個可用會話,在會話的生命週期處於“CONNECTING”時才算真正的建立完畢,所以需要使用到多執行緒中的一個工具類CountDownLatch)。

先看程式碼,程式碼中和後面有說明。HelloWorld入門:

public class ZookeeperBase {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "192.168.80.88:2181,192.168.80.87:2181,192.168.80.86:2181";
	/** session超時時間 */
	static final int SESSION_OUTTIME = 2000;//ms 
	/** 訊號量,阻塞程式執行,用於等待zookeeper連線成功,傳送成功訊號 */
	static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
	
	public static void main(String[] args) throws Exception{
		
		ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){
			@Override
			public void process(WatchedEvent event) {
				//獲取事件的狀態
				KeeperState keeperState = event.getState();
				EventType eventType = event.getType();
				//如果是建立連線
				if(KeeperState.SyncConnected == keeperState){
					if(EventType.None == eventType){
						//如果建立連線成功,則傳送訊號量,讓後續阻塞程式向下執行
						connectedSemaphore.countDown();
						System.out.println("zk 建立連線");
					}
				}
			}
		});

		//進行阻塞
		connectedSemaphore.await();
		
		System.out.println("..");
		//建立父節點
//		zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		
		//建立子節點,使用EPHEMERAL,主程式執行完成後該節點被刪除,只在本次會話內有效,可以用作分散式鎖。
//		zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		
		//獲取節點資訊
//		byte[] data = zk.getData("/testRoot", false, null);
//		System.out.println(new String(data));
//		System.out.println(zk.getChildren("/testRoot", false));
		
		//修改節點的值,-1表示跳過版本檢查,其他正數表示如果傳入的版本號與當前版本號不一致,則修改不成功,刪除是同樣的道理。
//		zk.setData("/testRoot", "modify data root".getBytes(), -1);
//		byte[] data = zk.getData("/testRoot", false, null);
//		System.out.println(new String(data));		
		
		//判斷節點是否存在
//		System.out.println(zk.exists("/testRoot/children", false));
		//刪除節點
//		zk.delete("/testRoot/children", -1);
//		System.out.println(zk.exists("/testRoot/children", false));
		
		zk.close();
		
		
		
	}
	
}

說明:

原生的zookeeper API提供了兩種建立節點的方法,同步和非同步建立節點方式。

同步方式:

引數1,節點路徑(名稱):/nodeName。不允許遞迴建立節點,在父節點不存在的情況下,不允許建立子節點。

引數2,節點內容:要求型別是位元組陣列,也就是說不支援序列話方式,如果需要實現序列化,可使用java相關序列化框架,如Hessian,Kryo。

引數3,節點許可權:使用Ids.OPEN_ACL_UNSAFE開放許可權即可。

引數4,節點型別:建立節點的型別,CreateMode.*,提供瞭如下所示的四種節點型別:

①PERSISTENT(持久節點)

②PERSISTENT_SEQUENTIAL(持久順序節點)

③EPHEMERAL(臨時節點,本次會話有效

④EPHEMERAL_SEQUENTIAL(臨時順序節點,本次會話有效

非同步方式(在同步方法引數的基礎上增加兩個引數):

引數5,回撥方法:註冊一個非同步回撥方法,要實現AsynCallBack.StringCallBack介面,重寫processResult(int rc, String path, Object ctx, String name)方法,當節點建立完成後執行此方法。

①rc:服務端響應碼,0表示呼叫成功、-4表示埠連線、-110表示指定節點存在、-112表示會話已過期。

②path:介面呼叫時傳入的資料節點的路徑引數。

③ctx:呼叫介面傳入的ctx值。

④name:實際在服務端建立的節點的名稱。

引數6,傳遞給回撥方法的引數,一般為上下文(Context)資訊。


Watcher、ZK狀態、事件型別:

zookeeper有watch事件,是一次性觸發的。當watch監視的資料發生變化時,通知在建立zookeeper是設定了Watcher的客戶端。Watcher類監視的事件型別和狀態型別如下所示:

事件型別(znode節點相關):

①EventType.NodeCreated:節點建立

②EventType.NodeDataChanged:節點資料變更

③EventType.NodeChildrenChanged:子節點變更

④EventType.NodeDeleted:節點刪除

狀態型別(客戶端例項相關):

①KeeperState.Disconnected:未連線

②KeeperState.SyncConnected:已連線

③KeeperState.AuthFailed:認證失敗

④KeeperState.Expired:會話失效

public class ZooKeeperWatcher implements Watcher {

	/** 定義原子變數 */
	AtomicInteger seq = new AtomicInteger();
	/** 定義session失效時間 */
	private static final int SESSION_TIMEOUT = 10000;
	/** zookeeper伺服器地址 */
	private static final String CONNECTION_ADDR = "192.168.80.88:2181";
	/** zk父路徑設定 */
	private static final String PARENT_PATH = "/testWatch";
	/** zk子路徑設定 */
	private static final String CHILDREN_PATH = "/testWatch/children";
	/** 進入標識 */
	private static final String LOG_PREFIX_OF_MAIN = "【Main】";
	/** zk變數 */
	private ZooKeeper zk = null;
	/** 訊號量設定,用於等待zookeeper連線建立之後 通知阻塞程式繼續向下執行 */
	private CountDownLatch connectedSemaphore = new CountDownLatch(1);

	/**
	 * 建立ZK連線
	 * @param connectAddr ZK伺服器地址列表
	 * @param sessionTimeout Session超時時間
	 */
	public void createConnection(String connectAddr, int sessionTimeout) {
		this.releaseConnection();
		try {
			zk = new ZooKeeper(connectAddr, sessionTimeout, this);
			System.out.println(LOG_PREFIX_OF_MAIN + "開始連線ZK伺服器");
			connectedSemaphore.await();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 關閉ZK連線
	 */
	public void releaseConnection() {
		if (this.zk != null) {
			try {
				this.zk.close();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 建立節點
	 * @param path 節點路徑
	 * @param data 資料內容
	 * @return 
	 */
	public boolean createPath(String path, String data) {
		try {
			//設定監控(由於zookeeper的監控都是一次性的所以 每次必須設定監控)
			this.zk.exists(path, true);
			System.out.println(LOG_PREFIX_OF_MAIN + "節點建立成功, Path: " + 
							   this.zk.create(	/**路徑*/ 
									   			path, 
									   			/**資料*/
									   			data.getBytes(), 
									   			/**所有可見*/
								   				Ids.OPEN_ACL_UNSAFE, 
								   				/**永久儲存*/
								   				CreateMode.PERSISTENT ) + 	
							   ", content: " + data);
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
		return true;
	}

	/**
	 * 讀取指定節點資料內容
	 * @param path 節點路徑
	 * @return
	 */
	public String readData(String path, boolean needWatch) {
		try {
			return new String(this.zk.getData(path, needWatch, null));
		} catch (Exception e) {
			e.printStackTrace();
			return "";
		}
	}

	/**
	 * 更新指定節點資料內容
	 * @param path 節點路徑
	 * @param data 資料內容
	 * @return
	 */
	public boolean writeData(String path, String data) {
		try {
			System.out.println(LOG_PREFIX_OF_MAIN + "更新資料成功,path:" + path + ", stat: " +
								this.zk.setData(path, data.getBytes(), -1));
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	/**
	 * 刪除指定節點
	 * 
	 * @param path
	 *            節點path
	 */
	public void deleteNode(String path) {
		try {
			this.zk.delete(path, -1);
			System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 判斷指定節點是否存在
	 * @param path 節點路徑
	 */
	public Stat exists(String path, boolean needWatch) {
		try {
			return this.zk.exists(path, needWatch);
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * 獲取子節點
	 * @param path 節點路徑
	 */
	private List<String> getChildren(String path, boolean needWatch) {
		try {
			return this.zk.getChildren(path, needWatch);
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * 刪除所有節點
	 */
	public void deleteAllTestPath() {
		if(this.exists(CHILDREN_PATH, false) != null){
			this.deleteNode(CHILDREN_PATH);
		}
		if(this.exists(PARENT_PATH, false) != null){
			this.deleteNode(PARENT_PATH);
		}		
	}
	
	/**
	 * 收到來自Server的Watcher通知後的處理。
	 */
	@Override
	public void process(WatchedEvent event) {
		
		System.out.println("進入 process 。。。。。event = " + event);
		
		try {
			Thread.sleep(200);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		if (event == null) {
			return;
		}
		
		// 連線狀態
		KeeperState keeperState = event.getState();
		// 事件型別
		EventType eventType = event.getType();
		// 受影響的path
		String path = event.getPath();
		
		String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

		System.out.println(logPrefix + "收到Watcher通知");
		System.out.println(logPrefix + "連線狀態:\t" + keeperState.toString());
		System.out.println(logPrefix + "事件型別:\t" + eventType.toString());

		if (KeeperState.SyncConnected == keeperState) {
			// 成功連線上ZK伺服器
			if (EventType.None == eventType) {
				System.out.println(logPrefix + "成功連線上ZK伺服器");
				connectedSemaphore.countDown();
			} 
			//建立節點
			else if (EventType.NodeCreated == eventType) {
				System.out.println(logPrefix + "節點建立");
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				this.exists(path, true);
			} 
			//更新節點
			else if (EventType.NodeDataChanged == eventType) {
				System.out.println(logPrefix + "節點資料更新");
				System.out.println("我看看走不走這裡........");
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(logPrefix + "資料內容: " + this.readData(PARENT_PATH, true));
			} 
			//更新子節點
			else if (EventType.NodeChildrenChanged == eventType) {
				System.out.println(logPrefix + "子節點變更");
				try {
					Thread.sleep(3000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(logPrefix + "子節點列表:" + this.getChildren(PARENT_PATH, true));
			} 
			//刪除節點
			else if (EventType.NodeDeleted == eventType) {
				System.out.println(logPrefix + "節點 " + path + " 被刪除");
			}
			else ;
		} 
		else if (KeeperState.Disconnected == keeperState) {
			System.out.println(logPrefix + "與ZK伺服器斷開連線");
		} 
		else if (KeeperState.AuthFailed == keeperState) {
			System.out.println(logPrefix + "許可權檢查失敗");
		} 
		else if (KeeperState.Expired == keeperState) {
			System.out.println(logPrefix + "會話失效");
		}
		else ;

		System.out.println("--------------------------------------------");

	}

	/**
	 * <B>方法名稱:</B>測試zookeeper監控<BR>
	 * <B>概要說明:</B>主要測試watch功能<BR>
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {

		//建立watcher
		ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
		//建立連線
		zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
		//System.out.println(zkWatch.zk.toString());
		
		Thread.sleep(1000);
		
		// 清理節點
		zkWatch.deleteAllTestPath();
		
		if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
			
			Thread.sleep(1000);
			
			
			// 讀取資料,在操作節點資料之前先呼叫zookeeper的getData()方法是為了可以watch到對節點的操作。watch是一次性的,
			// 也就是說,如果第二次又重新呼叫了setData()方法,在此之前需要重新呼叫一次。
			System.out.println("---------------------- read parent ----------------------------");
			//zkWatch.readData(PARENT_PATH, true);
			
			/** 讀取子節點,設定對子節點變化的watch,如果不寫該方法,則在建立子節點是隻會輸出NodeCreated,而不會輸出NodeChildrenChanged,
				也就是說建立子節點時沒有watch。
				如果是遞迴的建立子節點,如path="/p/c1/c2"的話,getChildren(PARENT_PATH, ture)只會在建立c1時watch,輸出c1的NodeChildrenChanged,
				而不會輸出建立c2時的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,則需要再呼叫一次getChildren(String path, true)方法,
				其中path="/p/c1"
			*/
			System.out.println("---------------------- read children path ----------------------------");
			zkWatch.getChildren(PARENT_PATH, true);

			// 更新資料
			zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
			
			Thread.sleep(1000);
			
			// 建立子節點,同理如果想要watch到NodeChildrenChanged狀態,需要呼叫getChildren(CHILDREN_PATH, true)
			zkWatch.readData(CHILDREN_PATH, true);
			zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
			
			Thread.sleep(1000);
			
			zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
		}
		
		Thread.sleep(50000);
		// 清理節點
		zkWatch.deleteAllTestPath();
		Thread.sleep(1000);
		zkWatch.releaseConnection();
	}

}

Watcher的特性:一次性、客戶端序列執行、輕量。

一次性:對於ZK的Watcher,只需要記住一點:Zookeeper的watch事件是一次性觸發的。當watch監視的資料發生變化時,通知設定了該watch的客戶端,即watcher。由於zookeeper的監視都是一次性的,所以每次必須設定監控。

客戶端序列執行:客戶端Watcher回撥的過程是一個串行同步的過程,這為我們保證了順序,同時需要注意一點,千萬不要因為一個Watcher的處理邏輯影響了這個客戶端的Watcher回撥。

輕量:WatchedEvent是Zookeeper整個Wacher通知機制的最小通知單元,整個資料結構只包含三部分:通知狀態、事件型別和節點路徑。也就是說Watcher通知非常的簡單,只會告訴客戶端發生了事件而不會告知其具體內容,需要客戶端自己去獲取,比如NodeDataChanged事件,Zookeeper只會通知客戶端指定節點的資料發生了變更,而不會直接提供具體的資料內容。

Zookeeper的ACL(AUTH):

ACL(Access Control List),Zookeeper作為一個分散式協調框架,其內部儲存的都是一些關乎分散式系統執行時狀態的元資料,尤其是涉及到一些分散式鎖、Master選舉和協調等應用場景。我們需要有效的保障Zookeeper中的資料安全,Zookeeper提供了一套完善的ACL許可權控制機制來保障資料的安全。

Zookeeper提供了三種模式,許可權模式、授權物件、許可權:

許可權模式:Scheme,開發人員經常使用如下四種許可權模式:

①IP:ip模式通過ip地址粒度來進行許可權控制,例如配置了:ip:192.168.1.107,即表示許可權控制都是針對這個ip地址的,同時也支援按網段分配,比如:192.168.1.*。

②Digest:digest是最常用的許可權控制模式,也更符合對許可權的認知。其類似於“username:password”形式的許可權控制標識進行許可權配置。Zookeeper會對形成的許可權標識先後進行兩次編碼處理,分別是SHA-1加密演算法和BASE64編碼。

③World:World是一種最開放的許可權控制模式。這種模式可以看做為特殊的digest,它僅僅是一個標識而已。

④Super:超級使用者模式。在超級使用者模式下可以對Zookeeper進行任意操作。

許可權物件:指的是許可權賦予給使用者或者一個指定的實體,例如IP地址或機器等。在不同的模式下,授權物件是不同的。這種模式和授權物件一一對應。

許可權:許可權就是指那些通過許可權檢測後可以被允許執行的操作,在Zookeeper中,對資料的操作許可權分為以下五大類:

CREATE、DELETE、READ、WRITE、ADMIN

public class ZookeeperAuth implements Watcher {

	/** 連線地址 */
	final static String CONNECT_ADDR = "192.168.80.88:2181";
	/** 測試路徑 */
	final static String PATH = "/testAuth";
	final static String PATH_DEL = "/testAuth/delNode";
	/** 認證型別 */
	final static String authentication_type = "digest";
	/** 認證正確方法 */
	final static String correctAuthentication = "123456";
	/** 認證錯誤方法 */
	final static String badAuthentication = "654321";
	
	static ZooKeeper zk = null;
	/** 計時器 */
	AtomicInteger seq = new AtomicInteger();
	/** 標識 */
	private static final String LOG_PREFIX_OF_MAIN = "【Main】";
	
	private CountDownLatch connectedSemaphore = new CountDownLatch(1);
	
	@Override
	public void process(WatchedEvent event) {
		try {
			Thread.sleep(200);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		if (event==null) {
			return;
		}
		// 連線狀態
		KeeperState keeperState = event.getState();
		// 事件型別
		EventType eventType = event.getType();
		// 受影響的path
		String path = event.getPath();
		
		String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

		System.out.println(logPrefix + "收到Watcher通知");
		System.out.println(logPrefix + "連線狀態:\t" + keeperState.toString());
		System.out.println(logPrefix + "事件型別:\t" + eventType.toString());
		if (KeeperState.SyncConnected == keeperState) {
			// 成功連線上ZK伺服器
			if (EventType.None == eventType) {
				System.out.println(logPrefix + "成功連線上ZK伺服器");
				connectedSemaphore.countDown();
			} 
		} else if (KeeperState.Disconnected == keeperState) {
			System.out.println(logPrefix + "與ZK伺服器斷開連線");
		} else if (KeeperState.AuthFailed == keeperState) {
			System.out.println(logPrefix + "許可權檢查失敗");
		} else if (KeeperState.Expired == keeperState) {
			System.out.println(logPrefix + "會話失效");
		}
		System.out.println("--------------------------------------------");
	}
	/**
	 * 建立ZK連線
	 * 
	 * @param connectString
	 *            ZK伺服器地址列表
	 * @param sessionTimeout
	 *            Session超時時間
	 */
	public void createConnection(String connectString, int sessionTimeout) {
		this.releaseConnection();
		try {
			zk = new ZooKeeper(connectString, sessionTimeout, this);
			//新增節點授權
			zk.addAuthInfo(authentication_type,correctAuthentication.getBytes());
			System.out.println(LOG_PREFIX_OF_MAIN + "開始連線ZK伺服器");
			//倒數等待
			connectedSemaphore.await();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 關閉ZK連線
	 */
	public void releaseConnection() {
		if (this.zk!=null) {
			try {
				this.zk.close();
			} catch (InterruptedException e) {
			}
		}
	}
	
	/**
	 * 
	 * <B>方法名稱:</B>測試函式<BR>
	 * <B>概要說明:</B>測試認證<BR>
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		
		ZookeeperAuth testAuth = new ZookeeperAuth();
		testAuth.createConnection(CONNECT_ADDR,2000);
		List<ACL> acls = new ArrayList<ACL>(1);
		for (ACL ids_acl : Ids.CREATOR_ALL_ACL) {
			acls.add(ids_acl);
		}

		try {
			zk.create(PATH, "init content".getBytes(), acls, CreateMode.PERSISTENT);
			System.out.println("使用授權key:" + correctAuthentication + "建立節點:"+ PATH + ", 初始內容是: init content");
		} catch (Exception e) {
			e.printStackTrace();
		}
		try {
			zk.create(PATH_DEL, "will be deleted! ".getBytes(), acls, CreateMode.PERSISTENT);
			System.out.println("使用授權key:" + correctAuthentication + "建立節點:"+ PATH_DEL + ", 初始內容是: init content");
		} catch (Exception e) {
			e.printStackTrace();
		}

		// 獲取資料
		getDataByNoAuthentication();
		getDataByBadAuthentication();
		getDataByCorrectAuthentication();

		// 更新資料
		updateDataByNoAuthentication();
		updateDataByBadAuthentication();
		updateDataByCorrectAuthentication();

		// 刪除資料
		deleteNodeByBadAuthentication();
		deleteNodeByNoAuthentication();
		deleteNodeByCorrectAuthentication();
		//
		Thread.sleep(1000);
		
		deleteParent();
		//釋放連線
		testAuth.releaseConnection();
	}
	/** 獲取資料:採用錯誤的密碼 */
	static void getDataByBadAuthentication() {
		String prefix = "[使用錯誤的授權資訊]";
		try {
			ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
			//授權
			badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
			Thread.sleep(2000);
			System.out.println(prefix + "獲取資料:" + PATH);
			System.out.println(prefix + "成功獲取資料:" + badzk.getData(PATH, false, null));
		} catch (Exception e) {
			System.err.println(prefix + "獲取資料失敗,原因:" + e.getMessage());
		}
	}

	/** 獲取資料:不採用密碼 */
	static void getDataByNoAuthentication() {
		String prefix = "[不使用任何授權資訊]";
		try {
			System.out.println(prefix + "獲取資料:" + PATH);
			ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
			Thread.sleep(2000);
			System.out.println(prefix + "成功獲取資料:" + nozk.getData(PATH, false, null));
		} catch (Exception e) {
			System.err.println(prefix + "獲取資料失敗,原因:" + e.getMessage());
		}
	}

	/** 採用正確的密碼 */
	static void getDataByCorrectAuthentication() {
		String prefix = "[使用正確的授權資訊]";
		try {
			System.out.println(prefix + "獲取資料:" + PATH);
			
			System.out.println(prefix + "成功獲取資料:" + zk.getData(PATH, false, null));
		} catch (Exception e) {
			System.out.println(prefix + "獲取資料失敗,原因:" + e.getMessage());
		}
	}

	/**
	 * 更新資料:不採用密碼
	 */
	static void updateDataByNoAuthentication() {

		String prefix = "[不使用任何授權資訊]";

		System.out.println(prefix + "更新資料: " + PATH);
		try {
			ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
			Thread.sleep(2000);
			Stat stat = nozk.exists(PATH, false);
			if (stat!=null) {
				nozk.setData(PATH, prefix.getBytes(), -1);
				System.out.println(prefix + "更新成功");
			}
		} catch (Exception e) {
			System.err.println(prefix + "更新失敗,原因是:" + e.getMessage());
		}
	}

	/**
	 * 更新資料:採用錯誤的密碼
	 */
	static void updateDataByBadAuthentication() {

		String prefix = "[使用錯誤的授權資訊]";

		System.out.println(prefix + "更新資料:" + PATH);
		try {
			ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
			//授權
			badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
			Thread.sleep(2000);
			Stat stat = badzk.exists(PATH, false);
			if (stat!=null) {
				badzk.setData(PATH, prefix.getBytes(), -1);
				System.out.println(prefix + "更新成功");
			}
		} catch (Exception e) {
			System.err.println(prefix + "更新失敗,原因是:" + e.getMessage());
		}
	}

	/**
	 * 更新資料:採用正確的密碼
	 */
	static void updateDataByCorrectAuthentication() {

		String prefix = "[使用正確的授權資訊]";

		System.out.println(prefix + "更新資料:" + PATH);
		try {
			Stat stat = zk.exists(PATH, false);
			if (stat!=null) {
				zk.setData(PATH, prefix.getBytes(), -1);
				System.out.println(prefix + "更新成功");
			}
		} catch (Exception e) {
			System.err.println(prefix + "更新失敗,原因是:" + e.getMessage());
		}
	}

	/**
	 * 不使用密碼 刪除節點
	 */
	static void deleteNodeByNoAuthentication() throws Exception {

		String prefix = "[不使用任何授權資訊]";

		try {
			System.out.println(prefix + "刪除節點:" + PATH_DEL);
			ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
			Thread.sleep(2000);
			Stat stat = nozk.exists(PATH_DEL, false);
			if (stat!=null) {
				nozk.delete(PATH_DEL,-1);
				System.out.println(prefix + "刪除成功");
			}
		} catch (Exception e) {
			System.err.println(prefix + "刪除失敗,原因是:" + e.getMessage());
		}
	}

	/**
	 * 採用錯誤的密碼刪除節點
	 */
	static void deleteNodeByBadAuthentication() throws Exception {

		String prefix = "[使用錯誤的授權資訊]";

		try {
			System.out.println(prefix + "刪除節點:" + PATH_DEL);
			ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
			//授權
			badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
			Thread.sleep(2000);
			Stat stat = badzk.exists(PATH_DEL, false);
			if (stat!=null) {
				badzk.delete(PATH_DEL, -1);
				System.out.println(prefix + "刪除成功");
			}
		} catch (Exception e) {
			System.err.println(prefix + "刪除失敗,原因是:" + e.getMessage());
		}
	}

	/**
	 * 使用正確的密碼刪除節點
	 */
	static void deleteNodeByCorrectAuthentication() throws Exception {

		String prefix = "[使用正確的授權資訊]";

		try {
			System.out.println(prefix + "刪除節點:" + PATH_DEL);
			Stat stat = zk.exists(PATH_DEL, false);
			if (stat!=null) {
				zk.delete(PATH_DEL, -1);
				System.out.println(prefix + "刪除成功");
			}
		} catch (Exception e) {
			System.out.println(prefix + "刪除失敗,原因是:" + e.getMessage());
		}
	}

	/**
	 * 使用正確的密碼刪除節點
	 */
	static void deleteParent() throws Exception {
		try {
			Stat stat = zk.exists(PATH_DEL, false);
			if (stat == null) {
				zk.delete(PATH, -1);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

相關推薦

Zookeeper——2使用Zookeeper原生API操作Zookeeper

zookeeper的javaclient可以使我們更輕鬆的實現對zookeeper的各種操作,要使用java操作zookeeper,需要引入zookeeper-3.4.5.jar和zkclient-0.1.jar。zookeeper-3.4.5.jar是官方提供的JAVA

使用ZooKeeper提供的Java API操作ZooKeeper

zookeeper 服務協調框架 分布式 集群 Java API 建立客戶端與zk服務端的連接 我們先來創建一個普通的maven工程,然後在pom.xml文件中配置zookeeper依賴: <dependencies> <dependency>

使用Java API操作zookeeper的acl權限

zookeeper Java API 分布式 服務協調框架 ACL權限 默認匿名權限 ZooKeeper提供了如下幾種驗證模式(scheme): digest:Client端由用戶名和密碼驗證,譬如user:password,digest的密碼生成方式是Sha1摘要的base64形式 a

Java API 操作 zookeeper

1.需要的jar包: 1.1 zookeeper的核心包:zookeeper-3.4.5.jar 1.2 log4j的jar包:log4j-1.2.15.jar 1.3 netty的jar包:netty-3.2.2.Final.jar 1.4 slf4j的jar

18 大資料zookeeper --使用java api操作zookeeper

ZooKeeper服務命令: 在準備好相應的配置之後,可以直接通過zkServer.sh 這個指令碼進行服務的相關操作 1. 啟動ZK服務: sh bin/zkServer.sh start 2. 檢視ZK服務狀態: sh bin/zkServer.sh status 3. 停止

zookeeper客戶端使用原生JavaApi操作節點

開發 vat closed 路徑 就是 list disco override () 1.引入依賴 <dependency> <groupId>org.apache.zookeeper</groupId>

2hive的基本操作

like -s txt code del class ext data 數據 1、創建表 hive>CREATE TABLE userTables(id INT,name STRING); 或者 hive> CREATE TABLE userTabl

2其他類API------calendar

clas cep int 相關 () 打印 static calendar ack 獲取日期中的指定日期信息比如獲取年,月,日 1 package cn.itcast.api.a.date; 2 3 import java.util.Calendar; 4

Spark SQL原理與DataFrameDataSet相關API操作以及程式碼介紹

//一. DataFrame建立//    1.json檔案//    val df = sqlContext.read.json("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.json")//    

使用Java APICurator操作zookeeper的acl許可權

zk原生api操作acl許可權 預設匿名許可權 ZooKeeper提供瞭如下幾種驗證模式(scheme): digest:Client端由使用者名稱和密碼驗證,譬如user:password,digest的密碼生成方式是Sha1摘要的base64形式 auth:不使用任何id

Apache Curator操作zookeeperAPI使用

zookeeper 分布式 集群 curator 中間件 curator簡介與客戶端之間的異同點 常用的zookeeper java客戶端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Java API的不足之處: 在

Zookeeper學習(三) 客戶端和原生API

safe ima call proc string 過程 心跳 current catch 前言 在這篇博客裏我會主要總結下兩個部分的操作: 在安裝ZooKeeper的機器上利用ZKClient連接Zookeeper的集群,然後利用相應的命令做一些簡單的操作。相信很多沒有

zookeeper概念應用場景資料組織叢集搭建客戶端操作Java客戶端curator

  一、zookeeper簡介      1.1 zookeeper簡介      Apache的很多專案以動物來命令,比如Hadoop(大象)、Hive(小蜜蜂)、Pig(豬豬),這些專案都是hadoop生態系統的成員。Hadoop生態系統是為了解決大資料儲存、大資料計算和大資料資料分析的,解決大

Zookeeper從入門到精通系列之--1.ZookeeperAPI操作

  一 IDEA環境搭建 1.1 建立一個maven專案 在pom.xml新增如下依賴 <dependencies> <dependency> <groupId>junit</gr

zookeeper客戶端api操作

這裡記錄zookeeper java客戶端api的使用。 客戶端建立Zookeeper例項,然後呼叫這個類提供的方法與zookeeper伺服器進行互動。 Zookeeper的建構函式有如下4種: ZooKeeper(connectString, sessionT

基於ZooKeeper原生API實現分散式鎖

其實實現分散式鎖主要需要有一個第三方中介軟體能夠提供鎖的儲存和鎖的釋放。像資料庫、Redis、ZooKeeper都是常用的分散式鎖解決方案。 分析 根據ZK節點的特性,在同一時間內,只會有一個客戶端建立/Locks/lock節點成功,失敗的節點則會監聽/Locks/loc

ZookeeperAPI操作以及ACL許可權

Zookeeper的ACL許可權 1. ACL的簡介 首先說明一下為什麼需要ACL 簡單來說 :在通常情況下,zookeeper允許未經授權的訪問,因此在安全漏洞掃描中暴漏未授權訪問漏洞。這在一些監控很嚴的系統中是不被允許的,所以需要A

zookeeper執行環境23:單節點安裝和偽分散式叢集安裝

轉載:http://www.aboutyun.com/thread-9097-1-1.html 問題導讀: 1.什麼是zookeeper 2.zookeeper有幾種安裝方式? 3.zookeeper偽分佈如何配置myid? 4.zookeeper包含哪些常用操作命令? 前

Zookeeper——4使用Curator操作Zookeeper

為了更好的實現Java操作zookeeper伺服器,後來出現了Curator框架,非常的強大,目前已經是Apache的頂級專案,裡面提供了更多豐富的操作,例如session超時重連、主從選舉、分散式計數器、分散式鎖等等適用於各種複雜的zookeeper場景的API封裝。(z

Apache Curator操作zookeeperAPI使用——watcher

curator在註冊watch事件上,提供了一個usingWatcher方法,使用這個方法註冊的watch事件和預設watch事件一樣,監聽只會觸發一次,監聽完畢後就會銷燬,也就是一次性的。而這個方法有兩種引數可選,一個是zk原生API的Watcher介面的實現類,另一個是Curator提供的Cur