淺談分散式鎖--基於Zookeeper實現篇
淺談分散式鎖--基於Zookeeper實現篇:
1、基於zookeeper臨時有序節點可以實現的分散式鎖。其實基於ZooKeeper,就是使用它的臨時有序節點來實現的分散式鎖。
來看下Zookeeper能不能解決前面提到的問題。
鎖無法釋放:使用Zookeeper可以有效的解決鎖無法釋放的問題,因為在建立鎖的時候,客戶端會在ZK中建立一個臨時節點,一旦客戶端獲取到鎖之後突然掛掉(Session連線斷開),那麼這個臨時節點就會自動刪除掉。其他客戶端就可以再次獲得鎖。
非阻塞鎖:使用Zookeeper可以實現阻塞的鎖,客戶端可以通過在ZK中建立順序節點,並且在節點上繫結監聽器,一旦節點有變化,Zookeeper會通知客戶端,客戶端可以檢查自己建立的節點是不是當前所有節點中序號最小的,如果是,那麼自己就獲取到鎖,便可以執行業務邏輯了。
不可重入:
單點問題:使用Zookeeper可以有效的解決單點問題,ZK是叢集部署的,只要叢集中有半數以上的機器存活,就可以對外提供服務。
2、實現原理:
原理就是:當某客戶端要進行邏輯的加鎖時,就在zookeeper上的某個指定節點的目錄下,去生成一個唯一的臨時有序節點, 然後判斷自己是否是這些有序節點中序號最小的一個,如果是,則算是獲取了鎖。如果不是,則說明沒有獲取到鎖,那麼就需要在序列中找到比自己小的那個節點,並對其呼叫exist()方法,對其註冊事件監聽,當監聽到這個節點被刪除了,那就再去判斷一次自己當初建立的節點是否變成了序列中最小的。如果是,則獲取鎖,如果不是,則重複上述步驟。
如圖,locker是一個持久節點,node_1/node_2/…/node_n 就是上面說的臨時節點,由客戶端client去建立的。
client_1/client_2/…/clien_n 都是想去獲取鎖的客戶端。以client_1為例,它想去獲取分散式鎖,則需要跑到locker下面去建立臨時節點(假如是node_1)建立完畢後,
看一下自己的節點序號是否是locker下面最小的,如果是,則獲取了鎖。如果不是,則去找到比自己小的那個節點(假如是node_2),找到後,就監聽node_2,直到node_2被刪除,
那麼就開始再次判斷自己的node_1是不是序列中最小的,如果是,則獲取鎖,如果還不是,則繼續找一下一個節點。
3、優點
鎖安全性高,zk可持久化
4、缺點
效能開銷比較高。因為其需要動態產生、銷燬瞬時節點來實現鎖功能。
5、實現
可以直接採用zookeeper第三方庫curator即可方便地實現分散式鎖
6、zookeeper還有幾個特質,讓它非常適合作為分散式鎖服務。
zookeeper支援watcher機制,這樣實現阻塞鎖,可以watch鎖資料,等到資料被刪除,zookeeper會通知客戶端去重新競爭鎖。
zookeeper的資料可以支援臨時節點的概念,即客戶端寫入的資料是臨時資料,在客戶端宕機後,臨時資料會被刪除,這樣就實現了鎖的異常釋放。使用這樣的方式,就不需要給鎖增加超時自動釋放的特性了。
zookeeper實現鎖的方式是客戶端一起競爭寫某條資料,比如/path/lock,只有第一個客戶端能寫入成功,其他的客戶端都會寫入失敗。
寫入成功的客戶端就獲得了鎖,寫入失敗的客戶端,註冊watch事件,等待鎖的釋放,從而繼續競爭該鎖。
7、節點介紹:
1、PERSISTENT-持久化目錄節點
客戶端與Zookeeper斷開連線後,該節點依舊存在
2、PERSISTENT_SEQUENTIAL-持久化順序編號目錄節點
客戶端與Zookeeper斷開連線後,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號
3、EPHEMERAL-臨時目錄節點
客戶端與Zookeeper斷開連線後,該節點被刪除
4、EPHEMERAL_SEQUENTIAL-臨時順序編號目錄節點
客戶端與Zookeeper斷開連線後,該節點被刪除,只是Zookeeper給該節點名稱進行順序編號
監視器(watcher):
當建立一個節點時,可以註冊一個該節點的監視器,當節點狀態發生改變時,watch被觸發時,ZooKeeper將會向客戶端傳送且僅傳送一條通知,因為watch只能被觸發一次。
8、如何利用這些特性來實現分散式鎖:
1. 建立一個鎖目錄lock。
2. 希望獲得鎖的執行緒A就在lock目錄下,建立臨時順序節點。
3. 獲取鎖目錄下所有的子節點,然後獲取比自己小的兄弟節點,如果不存在,則說明當前執行緒順序號最小,獲得鎖。
4. 執行緒B獲取所有節點,判斷自己不是最小節點,設定監聽(watcher)比自己次小的節點(只關注比自己次小的節點是為了防止發生“羊群效應”)。
5. 執行緒A處理完,刪除自己的節點,執行緒B監聽到變更事件,判斷自己是最小的節點,獲得鎖。
9、例項實現:
public class ZKLock implements Watcher {
private int threadId;
private ZooKeeper zk = null;
private String selfPath;
private String waitPath;
private String CURRENT_THREAD;
private static final String GROUP_PATH = "/disLocks";
private static final String SUB_PATH = "/disLocks/sub";
private static final String CONNECTION_STRING = "192.168.10.41:2181";
private static final int THREAD_NUM = 10;
// 確保連線zk成功;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
// 確保所有執行緒執行結束;
private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);
private static final Logger LOG = LoggerFactory.getLogger(ZKLock.class);
public ZKLock(int id) {
this.threadId = id;
CURRENT_THREAD = "【第" + threadId + "個執行緒】";
}
public static void main(String[] args) {
for (int i = 0; i < THREAD_NUM; i++) {
final int threadId = i + 1;
new Thread() {
@Override
public void run() {
try {
ZKLock dc = new ZKLock(threadId);
dc.createZKConnection(CONNECTION_STRING, 1000);
// GROUP_PATH不存在的話,由一個執行緒建立即可;
synchronized (threadSemaphore) {
dc.createPath(GROUP_PATH, "該節點由執行緒" + threadId + "建立", true);
}
dc.getLock();
} catch (Exception e) {
LOG.error("【第" + threadId + "個執行緒】 丟擲的異常:");
e.printStackTrace();
}
}
}.start();
}
try {
threadSemaphore.await();
LOG.info("所有執行緒執行結束!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 建立節點
*
* @param path
* 節點path
* @param data
* 初始資料內容
* @return
*/
public boolean createPath(String path, String data, boolean needWatch)
throws KeeperException, InterruptedException {
if (zk.exists(path, needWatch) == null) {
LOG.info(CURRENT_THREAD + "節點建立成功, Path: "
+ this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+ ", content: " + data);
}
return true;
}
/**
* 建立ZK連線
*/
public void createZKConnection(String connectString, int sessionTimeout) throws IOException, InterruptedException {
zk = new ZooKeeper(connectString, sessionTimeout, this);
connectedSemaphore.await();
}
/**
* 獲取鎖
*/
private void getLock() throws KeeperException, InterruptedException {
try {
selfPath = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException e) {
e.printStackTrace();
}
LOG.info(CURRENT_THREAD + "建立鎖路徑:" + selfPath);
if (checkMinPath()) {
// 如果是最小節點獲得鎖
getLockSuccess();
}
}
/**
* 獲取鎖成功
*/
public void getLockSuccess() throws KeeperException, InterruptedException {
if (zk.exists(this.selfPath, false) == null) {
LOG.error(CURRENT_THREAD + "本節點已不在了...");
return;
}
LOG.info(CURRENT_THREAD + "獲取鎖成功!");
// 處理業務邏輯
Thread.sleep(2000);
unlock();
}
/**
* 釋放鎖
*/
public void unlock() {
LOG.info(CURRENT_THREAD + "刪除本節點:" + selfPath);
try {
zk.delete(this.selfPath, -1);
releaseConnection();
threadSemaphore.countDown();
System.out.println(CURRENT_THREAD + "節點的鎖已經釋放了!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
/**
* 關閉ZK連線
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
}
}
LOG.info(CURRENT_THREAD + "釋放連線");
}
/**
* 檢查自己是不是最小的節點
*
* @return
*/
public boolean checkMinPath() throws KeeperException, InterruptedException {
List<String> subNodes = zk.getChildren(GROUP_PATH, false);
Collections.sort(subNodes);
int index = subNodes.indexOf(selfPath.substring(GROUP_PATH.length() + 1));
switch (index) {
case -1: {
LOG.error(CURRENT_THREAD + "節點已不在了..." + selfPath);
return false;
}
case 0: {
LOG.info(CURRENT_THREAD + "節點可以獲得鎖了" + selfPath);
return true;
}
default: {
this.waitPath = GROUP_PATH + "/" + subNodes.get(index - 1);
LOG.info(CURRENT_THREAD + "獲取子節點中,排在我前面的" + waitPath);
try {
zk.getData(waitPath, true, new Stat());
return false;
} catch (KeeperException e) {
if (zk.exists(waitPath, false) == null) {
LOG.info(CURRENT_THREAD + "子節點中,排在前面的" + waitPath + "已丟失");
return checkMinPath();
} else {
throw e;
}
}
}
}
}
@Override
public void process(WatchedEvent event) {
if (event == null) {
return;
}
Event.KeeperState keeperState = event.getState();
Event.EventType eventType = event.getType();
if (Event.KeeperState.SyncConnected == keeperState) {
if (Event.EventType.None == eventType) {
LOG.info(CURRENT_THREAD + "成功連線ZK伺服器");
connectedSemaphore.countDown();
} else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
LOG.info(CURRENT_THREAD + "排前面的節點丟失,判斷該節點是否可以獲得鎖?");
try {
if (checkMinPath()) {
getLockSuccess();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else if (Event.KeeperState.Disconnected == keeperState) {
LOG.info(CURRENT_THREAD + "與ZK伺服器斷開連線");
} else if (Event.KeeperState.AuthFailed == keeperState) {
LOG.info(CURRENT_THREAD + "許可權檢查失敗");
} else if (Event.KeeperState.Expired == keeperState) {
LOG.info(CURRENT_THREAD + "會話失效");
}
}
}
10、總結:(建議採用的方式)
使用Zookeeper實現分散式鎖的優點
有效的解決單點問題,不可重入問題,非阻塞問題以及鎖無法釋放的問題。實現起來較為簡單。
使用Zookeeper實現分散式鎖的缺點
效能上不如使用快取實現分散式鎖。 需要對ZK的原理有所瞭解。