zookeeper分散式協調機制及建立分散式鎖
zookeeper基本概念
要了解zookeeper如何建立分散式鎖,先了解一下zookeeper。zookeeper官網給出解釋:Apache ZooKeeper致力於開發和維護開源伺服器,實現高度可靠的分散式協調。
Zookeeper,一種分散式應用的協作服務,是Google的Chubby一個開源的實現,是Hadoop的分散式協調服務,它包含一個簡單的原語集,應用於分散式應用的協作服務,使得分散式應用可以基於這些介面實現諸如同步、配置維護和分叢集或者命名的服務。ZooKeeper:提供通用的分散式鎖服務,用以協調分散式應用.
zookeeper工作原理
zookeeper的核心是原子廣播,這個機制保證了各個server之間的同步,實現這個機制的協議叫做Zab協議.Zab協議有兩種模式,他們分別是恢復模式和廣播模式.
1.當服務啟動或者在領導者崩潰後,Zab就進入了恢復模式,當領導著被選舉出來,且大多數server都完成了和leader的狀態同步後,恢復模式就結束了.狀態同步保證了leader和server具有相同的系統狀態.
2.一旦leader已經和多數的follower進行了狀態同步後,他就可以開始廣播訊息了,即進入廣播狀態.這時候當一個server加入zookeeper服務中,它會在恢復模式下啟動,發下leader,並和leader進行狀態同步,待到同步結束,它也參與廣播訊息.
zookeeper的資料模型
層次化的目錄結構,命名符合常規檔案系統規範
每個節點在zookeeper中叫做znode,並且其有一個唯一的路徑標識
節點Znode可以包含資料和子節點,但是EPHEMERAL型別的節點不能有子節點
Znode中的資料可以有多個版本,比如某一個路徑下存有多個數據版本,那麼查詢這個路徑下的資料就需要帶上版本
客戶端應用可以在節點上設定監視器,節點不支援部分讀寫,而是一次性完整讀寫
Zoopkeeper 提供了一套很好的分散式叢集管理的機制,就是它這種基於層次型的目錄樹的資料結構,並對樹中的節點進行有效管理,從而可以設計出多種多樣的分散式的資料管理模型。
Zookeeper的節點
Znode有兩種型別,短暫的(ephemeral)和持久的(persistent)
Znode的型別在建立時確定並且之後不能再修改
短暫znode的客戶端會話結束時,zookeeper會將該短暫znode刪除,短暫znode不可以有子節點
持久znode不依賴於客戶端會話,只有當客戶端明確要刪除該持久znode時才會被刪除
Znode有四種形式的目錄節點,PERSISTENT、PERSISTENT_SEQUENTIAL、EPHEMERAL、EPHEMERAL_SEQUENTIAL.
znode 可以被監控,包括這個目錄節點中儲存的資料的修改,子節點目錄的變化等,一旦變化可以通知設定監控的客戶端,這個功能是zookeeper對於應用最重要的特性,
通過這個特性可以實現的功能包括配置的集中管理,叢集管理,分散式鎖等等.
Zookeeper的角色
領導者(leader),負責進行投票的發起和決議,更新系統狀態
學習者(learner),包括跟隨者(follower)和觀察者(observer).
follower用於接受客戶端請求並想客戶端返回結果,在選主過程中參與投票
Observer可以接受客戶端連線,將寫請求轉發給leader,但observer不參加投票過程,只同步leader的狀態,observer的目的是為了擴充套件系統,提高讀取速度
客戶端(client),請求發起方.
Watcher
Watcher 在 ZooKeeper 是一個核心功能,Watcher 可以監控目錄節點的資料變化以及子目錄的變化,一旦這些狀態發生變化,伺服器就會通知所有設定在這個目錄節點上的 Watcher,從而每個客戶端都很快知道它所關注的目錄節點的狀態發生變化,而做出相應的反應
可以設定觀察的操作:exists,getChildren,getData
可以觸發觀察的操作:create,delete,setData
znode以某種方式發生變化時,“觀察”(watch)機制可以讓客戶端得到通知.
可以針對ZooKeeper服務的“操作”來設定觀察,該服務的其他 操作可以觸發觀察.
比如,客戶端可以對某個客戶端呼叫exists操作,同時在它上面設定一個觀察,如果此時這個znode不存在,則exists返回 false,如果一段時間之後,這個znode被其他客戶端建立,則這個觀察會被觸發,之前的那個客戶端就會得到通知.
zookeeper的客戶端封裝的比較好的現在要屬Apache Curator,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。詳情見:ofollow,noindex">https://www.cnblogs.com/seaspring/p/5536338.html
zookeeper分散式鎖的實現
上面講解已經說明了zookeeper主要是通過節點znode的變化來控制全域性鎖的,下面我用程式碼具體呈現。
public class ZookeeperClient { private String zkAddr; private int timeOut; private String authSchema; private String authInfo; private CuratorFramework client; public ZookeeperClient(String zkAddr, int timeOut, String namespace) throws Exception { this(zkAddr, timeOut, namespace, null); } /** * 獲取zk 連線客戶端 * * @param zkAddrzk地址 ip:port,ip:port,ip:port * @param timeOut連線超時ms * @param namespace 所有的操作都是在 /namespace 下的節點操作 * @param aclAccess Control List(訪問控制列表)。Znode被建立時帶有一個ACL列表<br> *acl 主要由三個維度:schema,id,permision 控制節點許可權 <br> *eg:<br> *Id id = new Id("digest", DigestAuthenticationProvider.generateDigest("username:password"));<br> *ACL acl = new ACL(ZooDefs.Perms.ALL, id); <br> *<br> *維度 schema: <br> *1:digest 使用者名稱+密碼驗證 它對應的維度id=username:BASE64(SHA1(password))<br> *2:host 客戶端主機名hostname驗證 <br> *3:ip 它對應的維度id=客戶機的IP地址,設定的時候可以設定一個ip段,比如ip:192.168.1.0/16, 表示匹配前16個bit的IP段<br> *4:auth 使用sessionID驗證 <br> *5:world 無驗證,預設是無任何許可權它下面只有一個id, 叫anyone<br> *6:super: 在這種scheme情況下,對應的id擁有超級許可權,可以做任何事情(cdrwa)<br> *7:sasl: sasl的對應的id,是一個通過了kerberos認證的使用者id<br> *<br> *維度:permision <br> *ZooDefs.Perms.READ 讀許可權<br> *ZooDefs.Perms.WRITE 寫許可權<br> *ZooDefs.Perms.CREATE 建立節點許可權<br> *ZooDefs.Perms.DELETE 刪除節點許可權<br> *ZooDefs.Perms.ADMIN 能設定許可權<br> *ZooDefs.Perms.ALL 所有許可權<br> *ALL = READ | WRITE | CREATE | DELETE | ADMIN<br> * @throws Exception */ public ZookeeperClient(String zkAddr, int timeOut, String namespace, ACL acl) throws Exception { this.zkAddr = zkAddr; if (timeOut > 0) { this.timeOut = timeOut; } if (null != acl) { this.authSchema = acl.getId().getScheme(); this.authInfo = acl.getId().getId(); } CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory .builder().connectString(this.zkAddr).namespace(StringUtils.isEmpty(namespace) ? "" : namespace) .connectionTimeoutMs(this.timeOut) .retryPolicy(new RetryNTimes(5, 10)); if ((!StringUtils.isBlank(this.authSchema)) && (!StringUtils.isBlank(this.authInfo))) { builder.authorization(this.authSchema, this.authInfo.getBytes()); } System.out.println("namespace:"+namespace); this.client = builder.build(); this.client.start(); this.client.blockUntilConnected(5, TimeUnit.SECONDS); } /** * 建立一個所有許可權節點即schema:world;id:annyone;permision:ZooDefs.Perms.ALL * * @param nodePath建立的結點路徑 * @param data節點資料 * @param createMode 節點模式 * @param recursion當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createNode(String nodePath, String data, CreateMode createMode, boolean recursion) throws Exception { createNode(nodePath, ZooDefs.Ids.OPEN_ACL_UNSAFE, data, createMode, recursion); } /** * 建立節點 * * @param nodePath建立節點的路徑 * @param acls節點控制權限列表 * @param data節點存放的資料 * @param createMode 建立節點的模式 * @param recursion當父目錄不存在是否建立 true:建立,fasle:不建立 *節點模式CreateMode<br> *1:CreateMode.EPHEMERAL 建立臨時節點;該節點在客戶端掉線的時候被刪除<br> *2:CreateMode.EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點(可做分散式鎖)<br> *3:CreateMode.PERSISTENT 持久化目錄節點,儲存的資料不會丟失。<br> *4:CreateMode.PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,儲存的資料不會丟失,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名<br> * @throws Exception */ public void createNode(String nodePath, List<ACL> acls, String data, CreateMode createMode, boolean recursion) throws Exception { byte[] bytes = null; if (!StringUtils.isBlank(data)) { bytes = data.getBytes("UTF-8"); } createNode(nodePath, acls, bytes, createMode, recursion); } /** * @param nodePath建立節點的路徑 * @param acls節點控制權限列表 * @param data節點存放的資料 * @param createMode 建立節點的模式 * @param recursion當父目錄不存在是否建立 true:建立,fasle:不建立 *節點模式CreateMode<br> *1:CreateMode.EPHEMERAL 建立臨時節點;該節點在客戶端掉線的時候被刪除<br> *2:CreateMode.EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點(可做分散式鎖)<br> *3:CreateMode.PERSISTENT 持久化目錄節點,儲存的資料不會丟失。<br> *4:CreateMode.PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,儲存的資料不會丟失,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名<br> * @throws Exception */ public void createNode(String nodePath, List<ACL> acls, byte[] data, CreateMode createMode, boolean recursion) throws Exception { if (recursion) { ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client .create().creatingParentsIfNeeded().withMode(createMode)) .withACL(acls)).forPath(nodePath, data); } else { ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client .create().withMode(createMode)) .withACL(acls)).forPath(nodePath, data); } } /** * 建立一個所有許可權的永久節點 * * @param nodePath * @param data * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createPersitentNode(String nodePath, String data, boolean recursion) throws Exception { createNode(nodePath, data, CreateMode.PERSISTENT, recursion); } /** * 建立一個所有許可權的零時節點 * * @param nodePath * @param data * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createEphemeralNode(String nodePath, String data, boolean recursion) throws Exception { createNode(nodePath, data, CreateMode.EPHEMERAL, recursion); } /** * 建立一個帶許可權的永久節點 * * @param nodePath * @param data * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createPersitentNodeWithAcl(String nodePath, String data, List<ACL> acls, boolean recursion) throws Exception { createNode(nodePath, acls, data, CreateMode.PERSISTENT, recursion); } /** * 建立一個帶許可權的臨時節點 * * @param nodePath * @param data * @param recursion 當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createEphemeralNodeAcl(String nodePath, String data, List<ACL> acls, boolean recursion) throws Exception { createNode(nodePath, acls, data, CreateMode.EPHEMERAL, recursion); } /** * 建立序列節點且當父節點不存在時建立父節點 * * @param nodePath * @param acls可參考:ZooDefs.Ids * @param createMode * @param recursion當父目錄不存在是否建立 true:建立,fasle:不建立 * @throws Exception */ public void createSeqNode(String nodePath, List<ACL> acls, CreateMode createMode, boolean recursion) throws Exception { if (recursion) { ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client .create().creatingParentsIfNeeded() .withMode(createMode)) .withACL(acls)).forPath(nodePath); } else { ((BackgroundPathAndBytesable<?>) ((ACLBackgroundPathAndBytesable<?>) this.client .create() .withMode(createMode)) .withACL(acls)).forPath(nodePath); } } /** * 存在返回 節點stat 資訊;否則返回null * * @param path * @return * @throws Exception */ public Stat exists(String path) throws Exception { return this.client.checkExists().forPath(path); } /** * 判斷節點是否存在,存在則註冊節點監視器 * * @param path * @param watcher * @return */ public boolean exists(String path, Watcher watcher) throws Exception { if (null != watcher) { return null != ((BackgroundPathable<?>) this.client.checkExists().usingWatcher(watcher)).forPath(path); } return null != this.client.checkExists().forPath(path); } /** * 判斷是否處於連線狀態 * * @return */ public boolean isConnected() { if ((null == this.client) || (!CuratorFrameworkState.STARTED.equals(this.client .getState()))) { return false; } return true; } public void retryConnection() { this.client.start(); } /** * 獲取連線客戶端 * * @return */ public CuratorFramework getInnerClient() { return this.client; } /** * 關閉連線 */ public void quit() { if ((null != this.client) && (CuratorFrameworkState.STARTED .equals(this.client.getState()))) { this.client.close(); } } /** * 刪除節點 * * @param path * @param deleChildren * @throws Exception */ public void deleteNode(String path, boolean deleChildren) throws Exception { if (deleChildren) { this.client.delete().guaranteed().deletingChildrenIfNeeded() .forPath(path); } else { this.client.delete().forPath(path); } } /** * 設定節點資料 * * @param nodePath * @param data * @throws Exception */ public void setNodeData(String nodePath, String data) throws Exception { byte[] bytes = null; if (!StringUtils.isBlank(data)) { bytes = data.getBytes("UTF-8"); } setNodeData(nodePath, bytes); } /** * 設定節點資料 * * @param nodePath * @param data * @throws Exception */ public void setNodeData(String nodePath, byte[] data) throws Exception { this.client.setData().forPath(nodePath, data); } public String getNodeData(String nodePath, boolean watch) throws Exception { byte[] data; if (watch) { data = (byte[]) ((BackgroundPathable<?>) this.client.getData() .watched()).forPath(nodePath); } else { data = (byte[]) this.client.getData().forPath(nodePath); } if ((null == data) || (data.length <= 0)) { return null; } return new String(data, "UTF-8"); } public String getNodeData(String nodePath) throws Exception { return getNodeData(nodePath, false); } public String getNodeData(String nodePath, Watcher watcher) throws Exception { byte[] data = getNodeBytes(nodePath, watcher); return new String(data, "UTF-8"); } public byte[] getNodeBytes(String nodePath, Watcher watcher) throws Exception { byte[] bytes = null; if (null != watcher) { bytes = (byte[]) ((BackgroundPathable<?>) this.client.getData() .usingWatcher(watcher)).forPath(nodePath); } else { bytes = (byte[]) this.client.getData().forPath(nodePath); } return bytes; } public byte[] getNodeBytes(String nodePath) throws Exception { return getNodeBytes(nodePath, null); } @SuppressWarnings("unchecked") public List<String> getChildren(String nodePath, Watcher watcher) throws Exception { return (List<String>) ((BackgroundPathable<?>) this.client .getChildren().usingWatcher(watcher)).forPath(nodePath); } public List<String> getChildren(String path) throws Exception { return (List<String>) this.client.getChildren().forPath(path); } @SuppressWarnings("unchecked") public List<String> getChildren(String path, boolean watcher) throws Exception { if (watcher) { return (List<String>) ((BackgroundPathable<?>) this.client .getChildren().watched()).forPath(path); } return (List<String>) this.client.getChildren().forPath(path); } public ZookeeperClient addAuth(String authSchema, String authInfo) throws Exception { synchronized (ZookeeperClient.class) { this.client.getZookeeperClient().getZooKeeper() .addAuthInfo(authSchema, authInfo.getBytes()); } return this; } /** * 分散式鎖 * * @param lockPath * @return */ public InterProcessLock getInterProcessLock(String lockPath) { return new InterProcessMutex(this.client, lockPath); } }
上面這段程式碼是zookeeper的客戶端以及所有分散式鎖操作的方法實現,下面的程式碼是如何利用分散式鎖來控制分散式事務。
//建立一個永久節點作為全域性鎖 public static void testCreateNode(){ try { ZookeeperClient zkClient = ZkClientUtils.getZkClient(address,"ns", null); zkClient.createPersitentNode("/test/qaz/t2/t/t", "data", true); } catch (Exception e) { e.printStackTrace(); } } //測試分散式鎖 public static void testDistributeLock(){ for(int i=0;i<50;i++){ new Thread(){ @Override public void run() { InterProcessLock lock = null; try{ ZookeeperClient zkClient = ZkClientUtils.getZkClient(address,"dislock", null); lock = zkClient.getInterProcessLock("/distributeLock"); System.out.println(Thread.currentThread().getName()+"申請鎖"); lock.acquire(); System.out.println(Thread.currentThread().getName()+"持有鎖"); Thread.sleep(500); } catch(Exception e){ e.printStackTrace(); } finally{ if(null != lock){ try { lock.release(); System.out.println(Thread.currentThread().getName()+"釋放有鎖"); } catch (Exception e) { e.printStackTrace(); } } } } }.start(); } }