1. 程式人生 > >zookeeper 工具類以及利用InterProcessLock 建立分散式鎖

zookeeper 工具類以及利用InterProcessLock 建立分散式鎖

有些介面未測試;待後續優化 分散式鎖可參考http://surlymo.iteye.com/blog/2082684 實現
package com.ai.runner.test.lock;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;


/**
 * Date: 2016年5月28日 <br>
 * 
 * @author zhoushanbin
 */
public class ZkClient {

	private String zkAddr;
	private int timeOut;
	private String authSchema;
	private String authInfo;
	private CuratorFramework client;

	
	
	public ZkClient(String zkAddr, int timeOut, String namespace) throws Exception{
		
		this(zkAddr,timeOut,namespace,null);
		
	}
	
	/**
	 * 獲取zk 連線客戶端
	 * @param zkAddr zk地址 ip:port,ip:port,ip:port
	 * @param timeOut 連線超時ms
	 * @param namespace 所有的操作都是在 /namespace 下的節點操作
	 * @param acl  Access Control List(訪問控制列表)。Znode被建立時帶有一個ACL列表<br>
	 * acl 主要由三個維度:schema,id,permision 控制節點許可權 <br>
	 * eg:<br>
	 * Id id = new Id("digest", DigestAuthenticationProvider.generateDigest("username:password"));<br>  
	 * ACL acl = new ACL(ZooDefs.Perms.ALL, id); <br>
	 * <br>
	 * 維度 schema: <br>
	 * 1:digest 使用者名稱+密碼驗證 它對應的維度id=username:BASE64(SHA1(password))<br>
	 * 2:host 客戶端主機名hostname驗證 <br>
	 * 3:ip 它對應的維度id=客戶機的IP地址,設定的時候可以設定一個ip段,比如ip:192.168.1.0/16, 表示匹配前16個bit的IP段<br>
	 * 4:auth 使用sessionID驗證 <br>
	 * 5:world 無驗證,預設是無任何許可權  它下面只有一個id, 叫anyone  <br>
	 * 6:super: 在這種scheme情況下,對應的id擁有超級許可權,可以做任何事情(cdrwa)  <br>
	 * 7:sasl: sasl的對應的id,是一個通過了kerberos認證的使用者id  <br>
	 * <br>
	 * 維度:permision <br>
	 * ZooDefs.Perms.READ 讀許可權<br>
	 * ZooDefs.Perms.WRITE 寫許可權<br>
	 * ZooDefs.Perms.CREATE 建立節點許可權<br>
	 * ZooDefs.Perms.DELETE 刪除節點許可權<br>
	 * ZooDefs.Perms.ADMIN 能設定許可權<br>
	 * ZooDefs.Perms.ALL 所有許可權<br>
	 * ALL = READ | WRITE | CREATE | DELETE | ADMIN<br>
	 * @throws Exception 
	 */
	
	public ZkClient(String zkAddr, int timeOut, String namespace,ACL acl) throws Exception{
		this.zkAddr = zkAddr;
		if (timeOut > 0) {
			this.timeOut = timeOut;
		}
		if (null != acl) {
				this.authSchema = acl.getId().getScheme();
				this.authInfo = acl.getId().getId();
		}
		CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory
				.builder().connectString(this.zkAddr).namespace(StringUtils.isEmpty(namespace)?"":namespace)
				.connectionTimeoutMs(this.timeOut)
				.retryPolicy(new RetryNTimes(5, 10));
		if ((!StringUtils.isBlank(this.authSchema))
				&& (!StringUtils.isBlank(this.authInfo))) {
			builder.authorization(this.authSchema, this.authInfo.getBytes());
		}
		this.client = builder.build();
		this.client.start();
		this.client.blockUntilConnected(5, TimeUnit.SECONDS);
		
	}



	/**
	 * 建立一個所有許可權節點即schema:world;id:annyone;permision:ZooDefs.Perms.ALL
	 * @param nodePath 建立的結點路徑
	 * @param data 節點資料
	 * @param createMode 節點模式
	 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立
	 * @throws Exception
	 */
	public void createNode(String nodePath, String data, CreateMode createMode,boolean recursion)
			throws Exception {
		
		createNode(nodePath, ZooDefs.Ids.OPEN_ACL_UNSAFE, data, createMode,recursion);
	}
	
	
	/**
	 * 建立節點
	 * @param nodePath 建立節點的路徑
	 * @param acls 節點控制權限列表
	 * @param data 節點存放的資料
	 * @param createMode 建立節點的模式
	 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立
	 * 節點模式CreateMode<br>
	 * 1:CreateMode.EPHEMERAL 建立臨時節點;該節點在客戶端掉線的時候被刪除<br>
	 * 2:CreateMode.EPHEMERAL_SEQUENTIAL  臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點(可做分散式鎖)<br>
	 * 3:CreateMode.PERSISTENT 持久化目錄節點,儲存的資料不會丟失。<br>
	 * 4:CreateMode.PERSISTENT_SEQUENTIAL  順序自動編號的持久化目錄節點,儲存的資料不會丟失,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名<br>
	 * @throws Exception
	 */
	public void createNode(String nodePath, List<ACL> acls, String data,
			CreateMode createMode,boolean recursion) throws Exception {
		byte[] bytes = null;
		if (!StringUtils.isBlank(data)) {
			bytes = data.getBytes("UTF-8");
		}
		createNode(nodePath, acls, bytes, createMode,recursion);
	}

	/**
	 * @param nodePath 建立節點的路徑
	 * @param acls 節點控制權限列表
	 * @param data 節點存放的資料
	 * @param createMode 建立節點的模式
	 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立
	 * 節點模式CreateMode<br>
	 * 1:CreateMode.EPHEMERAL 建立臨時節點;該節點在客戶端掉線的時候被刪除<br>
	 * 2:CreateMode.EPHEMERAL_SEQUENTIAL  臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點(可做分散式鎖)<br>
	 * 3:CreateMode.PERSISTENT 持久化目錄節點,儲存的資料不會丟失。<br>
	 * 4:CreateMode.PERSISTENT_SEQUENTIAL  順序自動編號的持久化目錄節點,儲存的資料不會丟失,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名<br>
	 * @throws Exception
	 */
	public void createNode(String nodePath, List<ACL> acls, byte[] data,
			CreateMode createMode,boolean recursion) throws Exception {
		if(recursion){
			((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client
					.create().creatingParentsIfNeeded().withMode(createMode))
					.withACL(acls)).forPath(nodePath, data);
		}
		else{
			((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client
					.create().withMode(createMode))
					.withACL(acls)).forPath(nodePath, data);
		}
	}

	/**
	 * 建立一個所有許可權的永久節點
	 * @param nodePath
	 * @param data
	 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立
	 * @throws Exception
	 */
	public void createPersitentNode(String nodePath, String data,boolean recursion) throws Exception {
		
		createNode(nodePath, data, CreateMode.PERSISTENT,recursion);
	}

	/**
	 * 建立一個所有許可權的零時節點
	 * @param nodePath
	 * @param data
	 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立
	 * @throws Exception
	 */
	public void createEphemeralNode(String nodePath, String data,boolean recursion) throws Exception {
		
		createNode(nodePath, data, CreateMode.EPHEMERAL,recursion);
	}
	
	/**
	 * 建立一個帶許可權的永久節點
	 * @param nodePath
	 * @param data
	 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立
	 * @throws Exception
	 */
	public void createPersitentNodeWithAcl(String nodePath, String data,List<ACL> acls,boolean recursion) throws Exception {
		
		createNode(nodePath, acls, data, CreateMode.PERSISTENT,recursion);
	}

	/**
	 * 建立一個帶許可權的零時節點
	 * @param nodePath
	 * @param data
	 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立
	 * @throws Exception
	 */
	public void createEphemeralNodeAcl(String nodePath, String data,List<ACL> acls,boolean recursion) throws Exception {
		
		createNode(nodePath, acls, data, CreateMode.EPHEMERAL,recursion);
	}
	


	
	/**
	 * 建立序列節點且當父節點不存在時建立父節點
	 * @param nodePath
	 * @param acls 可參考:ZooDefs.Ids
	 * @param createMode
	 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立
	 * @throws Exception
	 */
	public void createSeqNode(String nodePath,List<ACL> acls,CreateMode createMode,boolean recursion) throws Exception {
		if(recursion){
			((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client
					.create().creatingParentsIfNeeded()
					.withMode(createMode))
					.withACL(acls)).forPath(nodePath);
		}
		else{
			((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client
					.create()
					.withMode(createMode))
					.withACL(acls)).forPath(nodePath);
		}
	}
	
	/**
	 * 存在返回 節點stat 資訊;否則返回null
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public Stat exists(String path) throws Exception {
		
		return  this.client.checkExists().forPath(path);
	}

	/**
	 * 判斷節點是否存在,存在則註冊節點監視器
	 * @param path
	 * @param watcher
	 * @return
	 */
	public boolean exists(String path, Watcher watcher) throws Exception {
		
		if (null != watcher) {
			return null != ((BackgroundPathable<?>) this.client.checkExists().usingWatcher(watcher)).forPath(path);
		}
		return null != this.client.checkExists().forPath(path);
	}

	/**
	 * 判斷是否處於連線狀態
	 * @return
	 */
	public boolean isConnected() {
		
		if ((null == this.client)
				|| (!CuratorFrameworkState.STARTED.equals(this.client
						.getState()))) {
			return false;
		}
		return true;
	}

	public void retryConnection() {
		this.client.start();
	}

	/**
	 * 獲取連線客戶端
	 * @return
	 */
	public CuratorFramework getInnerClient(){
		
		return this.client;
		
	}
	
	/**
	 * 關閉連線
	 */
	public void quit() {
		
		if ((null != this.client)
				&& (CuratorFrameworkState.STARTED
						.equals(this.client.getState()))) {
			this.client.close();
		}
	}
	
	
	/**
	 * 刪除節點
	 * @param path
	 * @param deleChildren
	 * @throws Exception
	
	 */
	public void deleteNode(String path,boolean deleChildren) throws Exception {
		
		if(deleChildren){
			this.client.delete().guaranteed().deletingChildrenIfNeeded()
					.forPath(path);
		}
		else{
			this.client.delete().forPath(path);
		}
	}
	
	/**
	 * 設定節點資料
	 * @param nodePath
	 * @param data
	 * @throws Exception
	 */
	public void setNodeData(String nodePath, String data) throws Exception {
		
		byte[] bytes = null;
		if (!StringUtils.isBlank(data)) {
			bytes = data.getBytes("UTF-8");
		}
		setNodeData(nodePath, bytes);
	}

	/**
	 * 設定節點資料
	 * @param nodePath
	 * @param data
	 * @throws Exception
	 */
	public void setNodeData(String nodePath, byte[] data) throws Exception {
		this.client.setData().forPath(nodePath, data);
	}
	
	public String getNodeData(String nodePath, boolean watch) throws Exception {
		byte[] data;
		if (watch) {
			data = (byte[]) ((BackgroundPathable<?>) this.client.getData()
					.watched()).forPath(nodePath);
		} else {
			data = (byte[]) this.client.getData().forPath(nodePath);
		}
		if ((null == data) || (data.length <= 0)) {
			return null;
		}
		return new String(data, "UTF-8");
	}

	public String getNodeData(String nodePath) throws Exception {
		return getNodeData(nodePath, false);
	}

	public String getNodeData(String nodePath, Watcher watcher)
			throws Exception {
		byte[] data = getNodeBytes(nodePath, watcher);
		return new String(data, "UTF-8");
	}

	public byte[] getNodeBytes(String nodePath, Watcher watcher)
			throws Exception {
		byte[] bytes = null;
		if (null != watcher) {
			bytes = (byte[]) ((BackgroundPathable<?>) this.client.getData()
					.usingWatcher(watcher)).forPath(nodePath);
		} else {
			bytes = (byte[]) this.client.getData().forPath(nodePath);
		}
		return bytes;
	}

	public byte[] getNodeBytes(String nodePath) throws Exception {
		return getNodeBytes(nodePath, null);
	}
	
	
	@SuppressWarnings("unchecked")
	public List<String> getChildren(String nodePath, Watcher watcher)
			throws Exception {
		return (List<String>) ((BackgroundPathable<?>) this.client
				.getChildren().usingWatcher(watcher)).forPath(nodePath);
	}
	
	public List<String> getChildren(String path) throws Exception {
		return (List<String>) this.client.getChildren().forPath(path);
	}

	@SuppressWarnings("unchecked")
	public List<String> getChildren(String path, boolean watcher)
			throws Exception {
		if (watcher) {
			return (List<String>) ((BackgroundPathable<?>) this.client
					.getChildren().watched()).forPath(path);
		}
		return (List<String>) this.client.getChildren().forPath(path);
	}


	public ZkClient addAuth(String authSchema, String authInfo)
			throws Exception {
		synchronized (ZkClient.class) {
			this.client.getZookeeperClient().getZooKeeper()
					.addAuthInfo(authSchema, authInfo.getBytes());
		}
		return this;
	}
	
	/**
	 * 分散式鎖
	 * @param lockPath
	 * @return
	 */
	public InterProcessLock getInterProcessLock(String lockPath) {
		return new InterProcessMutex(this.client, lockPath);
	}
	
}
package com.ai.runner.test.lock;

import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.zookeeper.data.ACL;

/**
 * @date 2017年4月19日
 * @author zhoushanbin
 *
 */
public class ZkClientUtils {

	private static  ZkClient zkClient;
	
	public static ZkClient getZkClient(String zkAddr, int timeOut, String namespace,ACL acl) throws Exception{
		if(null != zkClient){
			return zkClient;
		}
		synchronized (ZkClientUtils.class) {
			if(null != zkClient){
				return zkClient;
			}
			zkClient = new ZkClient(zkAddr, timeOut, namespace, acl);
		}
		return zkClient;
	}
	
	
/*******************************************************************************************/	
	public static String address = "ip:29181";
	public static void main(String args[]){
		
		//testDistributeLock();
		testCreateNode();
	}
	
	
	
	public static void testAcl(){
		
	}
	
	public static void testCreateNode(){
		try {
			ZkClient zkClient = ZkClientUtils.getZkClient(address, 30, "ns", null);
			zkClient.createPersitentNode("/test/qaz/t2/t/t", "data", true);
			
		} catch (Exception e) {
			
			e.printStackTrace();
		}
	}
	
	public static void testDeleteNode(){
		
	}
	
	public static void testSetData(){
		
	}
	
	public static void testGetChildren(){
		
	}
	
	public static void testGetData(){
		
	}
	
	public static void testDistributeLock(){
		
		for(int i=0;i<50;i++){
			new Thread(){

				@Override
				public void run() {
					InterProcessLock lock = null;
					try{
						ZkClient zkClient = ZkClientUtils.getZkClient(address, 30, "dislock", null);
						lock = zkClient.getInterProcessLock("/distributeLock");
						System.out.println(Thread.currentThread().getName()+"申請鎖");
						lock.acquire();
						System.out.println(Thread.currentThread().getName()+"持有鎖");
						Thread.sleep(500);
					}
					catch(Exception e){
						e.printStackTrace();
					}
					finally{
						if(null != lock){
							try {
								lock.release();
								System.out.println(Thread.currentThread().getName()+"釋放有鎖");
							} catch (Exception e) {
								e.printStackTrace();
							}
						}
					}
				}
				
			}.start();
			
		}
	}
}