1. 程式人生 > >基於Zookeeper實現的分布式互斥鎖 - InterProcessMutex

基於Zookeeper實現的分布式互斥鎖 - InterProcessMutex

function 可重入 lse containe ntc map delet mode start

CuratorZooKeeper的一個客戶端框架,其中封裝了分布式互斥鎖的實現,最為常用的是InterProcessMutex,本文將對其進行代碼剖析

簡介

InterProcessMutex基於Zookeeper實現了分布式的公平可重入互斥鎖,類似於單個JVM進程內的ReentrantLock(fair=true)

構造函數

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 最常用
public InterProcessMutex(CuratorFramework client, String path){
// Zookeeper利用path創建臨時順序節點,實現公平鎖的核心
this(client, path, new StandardLockInternalsDriver());
}

public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){
// maxLeases=1,表示可以獲得分布式鎖的線程數量(跨JVM)為1,即為互斥鎖
this(client, path, LOCK_NAME, 1, driver);
}

// protected構造函數
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver){
basePath = PathUtils.validatePath(path);
// internals的類型為LockInternals,InterProcessMutex將分布式鎖的申請和釋放操作委托給internals執行
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}

獲取鎖

InterProcessMutex.acquire

1
2
3
4
5
6
7
8
9
10
11
// 無限等待
public void acquire() throws Exception{
if ( !internalLock(-1, null) ){
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}

// 限時等待
public boolean acquire(long time, TimeUnit unit) throws Exception{
return internalLock(time, unit);
}

InterProcessMutex.internalLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private boolean internalLock(long time, TimeUnit unit) throws Exception{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null ){
// 實現可重入
// 同一線程再次acquire,首先判斷當前的映射表內(threadData)是否有該線程的鎖信息,如果有則原子+1,然後返回
lockData.lockCount.incrementAndGet();
return true;
}

// 映射表內沒有對應的鎖信息,嘗試通過LockInternals獲取鎖
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null ){
// 成功獲取鎖,記錄信息到映射表
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
1
2
3
// 映射表
// 記錄線程與鎖信息的映射關系
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
1
2
3
4
5
6
7
8
9
10
11
12
// 鎖信息
// Zookeeper中一個臨時順序節點對應一個“鎖”,但讓鎖生效激活需要排隊(公平鎖),下面會繼續分析
private static class LockData{
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1); // 分布式鎖重入次數

private LockData(Thread owningThread, String lockPath){
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}

LockInternals.attemptLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 嘗試獲取鎖,並返回鎖對應的Zookeeper臨時順序節點的路徑
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{
final long startMillis = System.currentTimeMillis();
// 無限等待時,millisToWait為null
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
// 創建ZNode節點時的數據內容,無關緊要,這裏為null,采用默認值(IP地址)
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
// 當前已經重試次數,與CuratorFramework的重試策略有關
int retryCount = 0;

// 在Zookeeper中創建的臨時順序節點的路徑,相當於一把待激活的分布式鎖
// 激活條件:同級目錄子節點,名稱排序最小(排隊,公平鎖),後續繼續分析
String ourPath = null;
// 是否已經持有分布式鎖
boolean hasTheLock = false;
// 是否已經完成嘗試獲取分布式鎖的操作
boolean isDone = false;

while ( !isDone ){
isDone = true;
try{
// 從InterProcessMutex的構造函數可知實際driver為StandardLockInternalsDriver的實例
// 在Zookeeper中創建臨時順序節點
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 循環等待來激活分布式鎖,實現鎖的公平性,後續繼續分析
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
} catch ( KeeperException.NoNodeException e ) {
// 容錯處理,不影響主邏輯的理解,可跳過
// 因為會話過期等原因,StandardLockInternalsDriver因為無法找到創建的臨時順序節點而拋出NoNodeException異常
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,
System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){
// 滿足重試策略嘗試重新獲取鎖
isDone = false;
} else {
// 不滿足重試策略則繼續拋出NoNodeException
throw e;
}
}
}
if ( hasTheLock ){
// 成功獲得分布式鎖,返回臨時順序節點的路徑,上層將其封裝成鎖信息記錄在映射表,方便鎖重入
return ourPath;
}
// 獲取分布式鎖失敗,返回null
return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// From StandardLockInternalsDriver
// 在Zookeeper中創建臨時順序節點
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception{
String ourPath;
// lockNodeBytes不為null則作為數據節點內容,否則采用默認內容(IP地址)
if ( lockNodeBytes != null ){
// 下面對CuratorFramework的一些細節做解釋,不影響對分布式鎖主邏輯的解釋,可跳過
// creatingParentContainersIfNeeded:用於創建父節點,如果不支持CreateMode.CONTAINER
// 那麽將采用CreateMode.PERSISTENT
// withProtection:臨時子節點會添加GUID前綴
ourPath = client.create().creatingParentContainersIfNeeded()
// CreateMode.EPHEMERAL_SEQUENTIAL:臨時順序節點,Zookeeper能保證在節點產生的順序性
// 依據順序來激活分布式鎖,從而也實現了分布式鎖的公平性,後續繼續分析
.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
ourPath = client.create().creatingParentContainersIfNeeded()
.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}

LockInternals.internalLockLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 循環等待來激活分布式鎖,實現鎖的公平性
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
// 是否已經持有分布式鎖
boolean haveTheLock = false;
// 是否需要刪除子節點
boolean doDelete = false;
try {
if (revocable.get() != null) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}

while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
// 獲取排序後的子節點列表
List<String> children = getSortedChildren();
// 獲取前面自己創建的臨時順序子節點的名稱
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 實現鎖的公平性的核心邏輯,看下面的分析
PredicateResults predicateResults = driver.getsTheLock(client,
children , sequenceNodeName , maxLeases);
if (predicateResults.getsTheLock()) {
// 獲得了鎖,中斷循環,繼續返回上層
haveTheLock = true;
} else {
// 沒有獲得到鎖,監聽上一臨時順序節點
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized (this) {
try {
// exists()會導致導致資源泄漏,因此exists()可以監聽不存在的ZNode,因此采用getData()
// 上一臨時順序節點如果被刪除,會喚醒當前線程繼續競爭鎖,正常情況下能直接獲得鎖,因為鎖是公平的
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true; // 獲取鎖超時,標記刪除之前創建的臨時順序節點
break;
}
wait(millisToWait); // 等待被喚醒,限時等待
} else {
wait(); // 等待被喚醒,無限等待
}
} catch (KeeperException.NoNodeException e) {
// 容錯處理,邏輯稍微有點繞,可跳過,不影響主邏輯的理解
// client.getData()可能調用時拋出NoNodeException,原因可能是鎖被釋放或會話過期(連接丟失)等
// 這裏並沒有做任何處理,因為外層是while循環,再次執行driver.getsTheLock時會調用validateOurIndex
// 此時會拋出NoNodeException,從而進入下面的catch和finally邏輯,重新拋出上層嘗試重試獲取鎖並刪除臨時順序節點
}
}
}
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
// 標記刪除,在finally刪除之前創建的臨時順序節點(後臺不斷嘗試)
doDelete = true;
// 重新拋出,嘗試重新獲取鎖
throw e;
} finally {
if (doDelete) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// From StandardLockInternalsDriver
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception{
// 之前創建的臨時順序節點在排序後的子節點列表中的索引
int ourIndex = children.indexOf(sequenceNodeName);
// 校驗之前創建的臨時順序節點是否有效
validateOurIndex(sequenceNodeName, ourIndex);
// 鎖公平性的核心邏輯
// 由InterProcessMutex的構造函數可知,maxLeases為1,即只有ourIndex為0時,線程才能持有鎖,或者說該線程創建的臨時順序節點激活了鎖
// Zookeeper的臨時順序節點特性能保證跨多個JVM的線程並發創建節點時的順序性,越早創建臨時順序節點成功的線程會更早地激活鎖或獲得鎖
boolean getsTheLock = ourIndex < maxLeases;
// 如果已經獲得了鎖,則無需監聽任何節點,否則需要監聽上一順序節點(ourIndex-1)
// 因為鎖是公平的,因此無需監聽除了(ourIndex-1)以外的所有節點,這是為了減少羊群效應,非常巧妙的設計!!
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
// 返回獲取鎖的結果,交由上層繼續處理(添加監聽等操作)
return new PredicateResults(pathToWatch, getsTheLock);
}

static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException{
if ( ourIndex < 0 ){
// 容錯處理,可跳過
// 由於會話過期或連接丟失等原因,該線程創建的臨時順序節點被Zookeeper服務端刪除,往外拋出NoNodeException
// 如果在重試策略允許範圍內,則進行重新嘗試獲取鎖,這會重新重新生成臨時順序節點
// 佩服Curator的作者將邊界條件考慮得如此周到!
throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);
}
}
1
2
3
4
5
6
7
8
9
10
// From LockInternals
private final Watcher watcher = new Watcher(){
@Override
public void process(WatchedEvent event){
notifyFromWatcher();
}
};
private synchronized void notifyFromWatcher(){
notifyAll(); // 喚醒所有等待LockInternals實例的線程
}
1
2
3
4
5
6
7
8
9
10
// From LockInternals
private void deleteOurPath(String ourPath) throws Exception{
try{
// 後臺不斷嘗試刪除
client.delete().guaranteed().forPath(ourPath);
} catch ( KeeperException.NoNodeException e ) {
// 已經刪除(可能會話過期導致),不做處理
// 實際使用Curator-2.12.0時,並不會拋出該異常
}
}

釋放鎖

弄明白了獲取鎖的原理,釋放鎖的邏輯就很清晰了

InterProcessMutex.release

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void release() throws Exception{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null ){
// 無法從映射表中獲取鎖信息,不持有鎖
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}

int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 ){
// 鎖是可重入的,初始值為1,原子-1到0,鎖才釋放
return;
}
if ( newLockCount < 0 ){
// 理論上無法執行該路徑
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try{
// lockData != null && newLockCount == 0,釋放鎖資源
internals.releaseLock(lockData.lockPath);
} finally {
// 最後從映射表中移除當前線程的鎖信息
threadData.remove(currentThread);
}
}

LockInternals.releaseLock

1
2
3
4
5
void releaseLock(String lockPath) throws Exception{
revocable.set(null);
// 刪除臨時順序節點,只會觸發後一順序節點去獲取鎖,理論上不存在競爭,只排隊,非搶占,公平鎖,先到先得
deleteOurPath(lockPath);
}
1
2
3
4
5
6
7
8
9
10
// Class:LockInternals
private void deleteOurPath(String ourPath) throws Exception{
try{
// 後臺不斷嘗試刪除
client.delete().guaranteed().forPath(ourPath);
} catch ( KeeperException.NoNodeException e ) {
// 已經刪除(可能會話過期導致),不做處理
// 實際使用Curator-2.12.0時,並不會拋出該異常
}
}

總結

InterProcessMutex的特性

  1. 分布式鎖(基於Zookeeper
  2. 互斥鎖
  3. 公平鎖(監聽上一臨時順序節點 + wait() / notifyAll()
  4. 可重入

基於Zookeeper實現的分布式互斥鎖 - InterProcessMutex