zookeeper 工具類以及利用InterProcessLock 建立分散式鎖
阿新 • • 發佈:2019-02-10
有些介面未測試;待後續優化 分散式鎖可參考http://surlymo.iteye.com/blog/2082684 實現
package com.ai.runner.test.lock; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; import org.apache.curator.framework.api.BackgroundPathAndBytesable; import org.apache.curator.framework.api.BackgroundPathable; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; /** * Date: 2016年5月28日 <br> * * @author zhoushanbin */ public class ZkClient { private String zkAddr; private int timeOut; private String authSchema; private String authInfo; private CuratorFramework client; public ZkClient(String zkAddr, int timeOut, String namespace) throws Exception{ this(zkAddr,timeOut,namespace,null); } /** * 獲取zk 連線客戶端 * @param zkAddr zk地址 ip:port,ip:port,ip:port * @param timeOut 連線超時ms * @param namespace 所有的操作都是在 /namespace 下的節點操作 * @param acl Access Control List(訪問控制列表)。Znode被建立時帶有一個ACL列表<br> * acl 主要由三個維度:schema,id,permision 控制節點許可權 <br> * eg:<br> * Id id = new Id("digest", DigestAuthenticationProvider.generateDigest("username:password"));<br> * ACL acl = new ACL(ZooDefs.Perms.ALL, id); <br> * <br> * 維度 schema: <br> * 1:digest 使用者名稱+密碼驗證 它對應的維度id=username:BASE64(SHA1(password))<br> * 2:host 客戶端主機名hostname驗證 <br> * 3:ip 它對應的維度id=客戶機的IP地址,設定的時候可以設定一個ip段,比如ip:192.168.1.0/16, 表示匹配前16個bit的IP段<br> * 4:auth 使用sessionID驗證 <br> * 5:world 無驗證,預設是無任何許可權 它下面只有一個id, 叫anyone <br> * 6:super: 在這種scheme情況下,對應的id擁有超級許可權,可以做任何事情(cdrwa) <br> * 7:sasl: sasl的對應的id,是一個通過了kerberos認證的使用者id <br> * <br> * 維度:permision <br> * ZooDefs.Perms.READ 讀許可權<br> * ZooDefs.Perms.WRITE 寫許可權<br> * ZooDefs.Perms.CREATE 建立節點許可權<br> * ZooDefs.Perms.DELETE 刪除節點許可權<br> * ZooDefs.Perms.ADMIN 能設定許可權<br> * ZooDefs.Perms.ALL 所有許可權<br> * ALL = READ | WRITE | CREATE | DELETE | ADMIN<br> * @throws Exception */ public ZkClient(String zkAddr, int timeOut, String namespace,ACL acl) throws Exception{ this.zkAddr = zkAddr; if (timeOut > 0) { this.timeOut = timeOut; } if (null != acl) { this.authSchema = acl.getId().getScheme(); this.authInfo = acl.getId().getId(); } CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory .builder().connectString(this.zkAddr).namespace(StringUtils.isEmpty(namespace)?"":namespace) .connectionTimeoutMs(this.timeOut) .retryPolicy(new RetryNTimes(5, 10)); if ((!StringUtils.isBlank(this.authSchema)) && (!StringUtils.isBlank(this.authInfo))) { builder.authorization(this.authSchema, this.authInfo.getBytes()); } this.client = builder.build(); this.client.start(); this.client.blockUntilConnected(5, TimeUnit.SECONDS); } /** * 建立一個所有許可權節點即schema:world;id:annyone;permision:ZooDefs.Perms.ALL * @param nodePath 建立的結點路徑 * @param data 節點資料 * @param createMode 節點模式 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createNode(String nodePath, String data, CreateMode createMode,boolean recursion) throws Exception { createNode(nodePath, ZooDefs.Ids.OPEN_ACL_UNSAFE, data, createMode,recursion); } /** * 建立節點 * @param nodePath 建立節點的路徑 * @param acls 節點控制權限列表 * @param data 節點存放的資料 * @param createMode 建立節點的模式 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * 節點模式CreateMode<br> * 1:CreateMode.EPHEMERAL 建立臨時節點;該節點在客戶端掉線的時候被刪除<br> * 2:CreateMode.EPHEMERAL_SEQUENTIAL 臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點(可做分散式鎖)<br> * 3:CreateMode.PERSISTENT 持久化目錄節點,儲存的資料不會丟失。<br> * 4:CreateMode.PERSISTENT_SEQUENTIAL 順序自動編號的持久化目錄節點,儲存的資料不會丟失,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名<br> * @throws Exception */ public void createNode(String nodePath, List<ACL> acls, String data, CreateMode createMode,boolean recursion) throws Exception { byte[] bytes = null; if (!StringUtils.isBlank(data)) { bytes = data.getBytes("UTF-8"); } createNode(nodePath, acls, bytes, createMode,recursion); } /** * @param nodePath 建立節點的路徑 * @param acls 節點控制權限列表 * @param data 節點存放的資料 * @param createMode 建立節點的模式 * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * 節點模式CreateMode<br> * 1:CreateMode.EPHEMERAL 建立臨時節點;該節點在客戶端掉線的時候被刪除<br> * 2:CreateMode.EPHEMERAL_SEQUENTIAL 臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點(可做分散式鎖)<br> * 3:CreateMode.PERSISTENT 持久化目錄節點,儲存的資料不會丟失。<br> * 4:CreateMode.PERSISTENT_SEQUENTIAL 順序自動編號的持久化目錄節點,儲存的資料不會丟失,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名<br> * @throws Exception */ public void createNode(String nodePath, List<ACL> acls, byte[] data, CreateMode createMode,boolean recursion) throws Exception { if(recursion){ ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client .create().creatingParentsIfNeeded().withMode(createMode)) .withACL(acls)).forPath(nodePath, data); } else{ ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client .create().withMode(createMode)) .withACL(acls)).forPath(nodePath, data); } } /** * 建立一個所有許可權的永久節點 * @param nodePath * @param data * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createPersitentNode(String nodePath, String data,boolean recursion) throws Exception { createNode(nodePath, data, CreateMode.PERSISTENT,recursion); } /** * 建立一個所有許可權的零時節點 * @param nodePath * @param data * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createEphemeralNode(String nodePath, String data,boolean recursion) throws Exception { createNode(nodePath, data, CreateMode.EPHEMERAL,recursion); } /** * 建立一個帶許可權的永久節點 * @param nodePath * @param data * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createPersitentNodeWithAcl(String nodePath, String data,List<ACL> acls,boolean recursion) throws Exception { createNode(nodePath, acls, data, CreateMode.PERSISTENT,recursion); } /** * 建立一個帶許可權的零時節點 * @param nodePath * @param data * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createEphemeralNodeAcl(String nodePath, String data,List<ACL> acls,boolean recursion) throws Exception { createNode(nodePath, acls, data, CreateMode.EPHEMERAL,recursion); } /** * 建立序列節點且當父節點不存在時建立父節點 * @param nodePath * @param acls 可參考:ZooDefs.Ids * @param createMode * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createSeqNode(String nodePath,List<ACL> acls,CreateMode createMode,boolean recursion) throws Exception { if(recursion){ ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client .create().creatingParentsIfNeeded() .withMode(createMode)) .withACL(acls)).forPath(nodePath); } else{ ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client .create() .withMode(createMode)) .withACL(acls)).forPath(nodePath); } } /** * 存在返回 節點stat 資訊;否則返回null * @param path * @return * @throws Exception */ public Stat exists(String path) throws Exception { return this.client.checkExists().forPath(path); } /** * 判斷節點是否存在,存在則註冊節點監視器 * @param path * @param watcher * @return */ public boolean exists(String path, Watcher watcher) throws Exception { if (null != watcher) { return null != ((BackgroundPathable<?>) this.client.checkExists().usingWatcher(watcher)).forPath(path); } return null != this.client.checkExists().forPath(path); } /** * 判斷是否處於連線狀態 * @return */ public boolean isConnected() { if ((null == this.client) || (!CuratorFrameworkState.STARTED.equals(this.client .getState()))) { return false; } return true; } public void retryConnection() { this.client.start(); } /** * 獲取連線客戶端 * @return */ public CuratorFramework getInnerClient(){ return this.client; } /** * 關閉連線 */ public void quit() { if ((null != this.client) && (CuratorFrameworkState.STARTED .equals(this.client.getState()))) { this.client.close(); } } /** * 刪除節點 * @param path * @param deleChildren * @throws Exception */ public void deleteNode(String path,boolean deleChildren) throws Exception { if(deleChildren){ this.client.delete().guaranteed().deletingChildrenIfNeeded() .forPath(path); } else{ this.client.delete().forPath(path); } } /** * 設定節點資料 * @param nodePath * @param data * @throws Exception */ public void setNodeData(String nodePath, String data) throws Exception { byte[] bytes = null; if (!StringUtils.isBlank(data)) { bytes = data.getBytes("UTF-8"); } setNodeData(nodePath, bytes); } /** * 設定節點資料 * @param nodePath * @param data * @throws Exception */ public void setNodeData(String nodePath, byte[] data) throws Exception { this.client.setData().forPath(nodePath, data); } public String getNodeData(String nodePath, boolean watch) throws Exception { byte[] data; if (watch) { data = (byte[]) ((BackgroundPathable<?>) this.client.getData() .watched()).forPath(nodePath); } else { data = (byte[]) this.client.getData().forPath(nodePath); } if ((null == data) || (data.length <= 0)) { return null; } return new String(data, "UTF-8"); } public String getNodeData(String nodePath) throws Exception { return getNodeData(nodePath, false); } public String getNodeData(String nodePath, Watcher watcher) throws Exception { byte[] data = getNodeBytes(nodePath, watcher); return new String(data, "UTF-8"); } public byte[] getNodeBytes(String nodePath, Watcher watcher) throws Exception { byte[] bytes = null; if (null != watcher) { bytes = (byte[]) ((BackgroundPathable<?>) this.client.getData() .usingWatcher(watcher)).forPath(nodePath); } else { bytes = (byte[]) this.client.getData().forPath(nodePath); } return bytes; } public byte[] getNodeBytes(String nodePath) throws Exception { return getNodeBytes(nodePath, null); } @SuppressWarnings("unchecked") public List<String> getChildren(String nodePath, Watcher watcher) throws Exception { return (List<String>) ((BackgroundPathable<?>) this.client .getChildren().usingWatcher(watcher)).forPath(nodePath); } public List<String> getChildren(String path) throws Exception { return (List<String>) this.client.getChildren().forPath(path); } @SuppressWarnings("unchecked") public List<String> getChildren(String path, boolean watcher) throws Exception { if (watcher) { return (List<String>) ((BackgroundPathable<?>) this.client .getChildren().watched()).forPath(path); } return (List<String>) this.client.getChildren().forPath(path); } public ZkClient addAuth(String authSchema, String authInfo) throws Exception { synchronized (ZkClient.class) { this.client.getZookeeperClient().getZooKeeper() .addAuthInfo(authSchema, authInfo.getBytes()); } return this; } /** * 分散式鎖 * @param lockPath * @return */ public InterProcessLock getInterProcessLock(String lockPath) { return new InterProcessMutex(this.client, lockPath); } }
package com.ai.runner.test.lock; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.zookeeper.data.ACL; /** * @date 2017年4月19日 * @author zhoushanbin * */ public class ZkClientUtils { private static ZkClient zkClient; public static ZkClient getZkClient(String zkAddr, int timeOut, String namespace,ACL acl) throws Exception{ if(null != zkClient){ return zkClient; } synchronized (ZkClientUtils.class) { if(null != zkClient){ return zkClient; } zkClient = new ZkClient(zkAddr, timeOut, namespace, acl); } return zkClient; } /*******************************************************************************************/ public static String address = "ip:29181"; public static void main(String args[]){ //testDistributeLock(); testCreateNode(); } public static void testAcl(){ } public static void testCreateNode(){ try { ZkClient zkClient = ZkClientUtils.getZkClient(address, 30, "ns", null); zkClient.createPersitentNode("/test/qaz/t2/t/t", "data", true); } catch (Exception e) { e.printStackTrace(); } } public static void testDeleteNode(){ } public static void testSetData(){ } public static void testGetChildren(){ } public static void testGetData(){ } public static void testDistributeLock(){ for(int i=0;i<50;i++){ new Thread(){ @Override public void run() { InterProcessLock lock = null; try{ ZkClient zkClient = ZkClientUtils.getZkClient(address, 30, "dislock", null); lock = zkClient.getInterProcessLock("/distributeLock"); System.out.println(Thread.currentThread().getName()+"申請鎖"); lock.acquire(); System.out.println(Thread.currentThread().getName()+"持有鎖"); Thread.sleep(500); } catch(Exception e){ e.printStackTrace(); } finally{ if(null != lock){ try { lock.release(); System.out.println(Thread.currentThread().getName()+"釋放有鎖"); } catch (Exception e) { e.printStackTrace(); } } } } }.start(); } } }