zookeeper — 實現分布式鎖
阿新 • • 發佈:2019-02-16
lean sta 會話期 競爭 all zookeeper scribe handle 獲取鎖
一.前言
在之前的文章中介紹過分布式鎖的特點和利用Redis實現簡單的分布式鎖。但是分布式鎖的實現還有很多其他方式,但是萬變不離其宗,始終遵循一個特點:同一時刻只能有一個操作獲取。這篇文章主要介紹如何基於zookeeper實現分布式鎖。
- zookeeper能夠作為分布式鎖實現的基礎
- 算法流程
- 實現
關於分布式鎖的相關特性,這裏不再贅述,請參考分布式鎖。
二.zookeeper能夠作為分布式鎖實現的基礎
這裏回顧下分布式鎖的特點:
- 每次只能一個占用鎖;
- 可以重復進入鎖;
- 只有占用者才可以解鎖;
- 獲取鎖和釋放鎖都需要原子
- 不能產生死鎖
- 盡量滿足性能
zookeeper中有一種臨時順序節點,它具有以下特征:
- 時效性,當會話結束,節點將自動被刪除
- 順序性,當多個應用向其註冊順序節點時,每個順序號將只能被一個應用獲取
利用以上的特點可以滿足分布式鎖實現的基本要求:
因為順序性,可以讓最小順序號的應用獲取到鎖,從而滿足分布式鎖的每次只能一個占用鎖,因為只有它一個獲取到,所以可以實現重復進入,只要設置標識即可。鎖的釋放,即刪除應用在zookeeper上註冊的節點,因為每個節點只被自己註冊擁有,所以只有自己才能刪除,這樣就滿足只有占用者才可以解鎖
zookeeper的序號分配是原子的,分配後即不會再改變,讓最小序號者獲取鎖,所以獲取鎖是原子的
因為註冊的是臨時節點,在會話期間內有效,所以不會產生死鎖
zookeeper註冊節點的性能能滿足幾千,而且支持集群,能夠滿足大部分情況下的性能
三.算法流程
1.獲取鎖
需要獲取分布式鎖的應用都向zookeeper的/lock/{resouce}目錄下註冊sequence-前綴的節點,序號最小者獲取到操作資源的權限:
Note:
這裏的resource需要依據競爭的具體資源確定,如競爭賬戶則可以使用賬戶號作為resource。
從圖中可以看出,clientA的順序號最小,由它獲取到鎖,操作資源。
算法步驟:
- client判斷/lock目錄是否存在,如果不存在則向其註冊/lock的持久節點
- client判斷/lock目錄下是否存在競爭的資源resouce目錄,如果不存在則向其註冊/lock/resource的持久節點
- client向/lock/resource目錄下註冊/lock/resource/sequence-前綴的臨時順序節點,並得到順序號
- client獲取/lock/resource目錄下的所有臨時順序子節點
- client判斷臨時子節點序號中是否存在比自身的序號小的節點。如果不存在,則獲取到鎖;如果存在,則對象該臨時節點做watch監控
- 如果收到監控的臨時節點被刪除的通知,則再重復4、5步驟,直到獲取到鎖
流程圖:
2.釋放鎖
因為最小的節點只被獲取到鎖的client持有,所以該鎖不可能被其他client釋放。同時釋放鎖只需要將臨時順序節點刪除,也是原子性操作。
三.實現
/**
* 基於Zookeeper實現分布式鎖
*
* @author huaijin
*/
public class DistributedLockBaseZookeeper implements DistributedLock {
private static final Logger log = LoggerFactory.getLogger(DistributedLockBaseZookeeper.class);
/**
* 利用空串作為各個節點存儲的數據
*/
private static final String EMPTY_DATA = "";
/**
* 分布式鎖的根目錄
*/
private static final String LOCK_ROOT = "/lock";
/**
* zookeeper目錄分隔符
*/
private static final String PATH_SEPARATOR = "/";
/**
* 臨時順序節點前綴
*/
private static final String LOCK_NODE_PREFIX = "sequence-";
/**
* 利用Lock和Condition實現等待通知
*/
private Lock waitNotifierLock = new ReentrantLock();
private Condition waitNotifier = waitNotifierLock.newCondition();
/**
* 操作zookeeper的client
*/
private ZkClient zkClient;
/**
* 分布式資源的路徑
*/
private String resourcePath;
/**
* 鎖節點完整前綴
*/
private String lockNodePrefix;
/**
* 當前註冊的臨時順序節點路徑
*/
private String currentLockNodePath;
public DistributedLockBaseZookeeper(String resource, ZkClient zkClient) {
Objects.requireNonNull(zkClient, "zkClient must not be null!");
if (resource == null || resource.isEmpty()) {
throw new IllegalArgumentException("resource must not be null!");
}
this.zkClient = zkClient;
this.resourcePath = LOCK_ROOT + PATH_SEPARATOR + resource;
this.lockNodePrefix = resourcePath + PATH_SEPARATOR + LOCK_NODE_PREFIX;
// 創建分布式鎖根目錄
if (!this.zkClient.exists(LOCK_ROOT)) {
try {
this.zkClient.create(LOCK_ROOT, EMPTY_DATA, CreateMode.PERSISTENT);
} catch (ZkNodeExistsException e) {
// ignore, logging
log.warn("The root path for lock already exists.");
}
}
// 創建資源目錄
if (!this.zkClient.exists(resourcePath)) {
try {
this.zkClient.create(resourcePath, EMPTY_DATA, CreateMode.PERSISTENT);
} catch (ZkNodeExistsException e) {
// ignore, logging
log.warn("The resource path for [" + resourcePath + "] already exists.");
}
}
}
@Override
public void lock() throws DistributedLockException {
if (!acquireLock()) {
// 如果獲取鎖不成功,則等待
waitNotifierLock.lock();
try {
waitNotifier.await();
} catch (Exception e) {
throw new DistributedLockException("Interrupt when waiting notification.");
} finally {
waitNotifierLock.unlock();
}
}
}
@Override
public void unlock() {
// 刪除自身節點,釋放鎖
zkClient.delete(currentLockNodePath);
}
private boolean acquireLock() throws DistributedLockException {
// 如果當前未註冊臨時順序節點,則註冊
if (this.currentLockNodePath == null) {
this.currentLockNodePath = zkClient.create(lockNodePrefix, EMPTY_DATA, CreateMode.EPHEMERAL_SEQUENTIAL);
}
// 獲取順序號
long lockNodeSeq = fetchSeqFromNodePath(currentLockNodePath);
// 獲取所有子節點
List<String> childNodePaths = zkClient.getChildren(resourcePath);
if (childNodePaths == null || childNodePaths.isEmpty()) {
throw new DistributedLockException("Not exists child nodes.");
}
// 從所有子節點中獲取最小子節點的順序號
long minSeq = 1000000L;
int minIndex = -1;
for (int i = 0; i < childNodePaths.size(); i++) {
long nodeSeq = fetchSeqFromNodePath(resourcePath + childNodePaths.get(i));
if (nodeSeq < minSeq) {
minSeq = nodeSeq;
minIndex = i;
}
}
// 比較自身順序號與最小序號
if (lockNodeSeq > minSeq) {
// 如果存在更小序號,則監控最小序號的子節點
String minLockNodePath = childNodePaths.get(minIndex);
zkClient.subscribeDataChanges(resourcePath + PATH_SEPARATOR + minLockNodePath,
new ListenerForLockRelease());
return false;
}
// 成功獲取鎖,返回
return true;
}
private long fetchSeqFromNodePath(String nodePath) {
String seq = nodePath.substring(lockNodePrefix.length());
return Long.valueOf(seq);
}
private class ListenerForLockRelease implements IZkDataListener {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 如果成功獲取鎖,則通知,讓主線程返回
if (acquireLock()) {
waitNotifierLock.lock();
try {
waitNotifier.signal();
} finally {
waitNotifierLock.unlock();
}
}
}
}
}
zookeeper — 實現分布式鎖