分散式鎖原始碼剖析(4) zookeeper實現分散式鎖
阿新 • • 發佈:2018-12-27
zookeeper分散式鎖(curator)
maven配置檔案:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.3.0</version>
</dependency>
程式碼示例:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient( "192.168.31.184:2181,192.168.31.207:2181,192.168.31.192:2181", retryPolicy); client.start(); InterProcessMutex lock = new InterProcessMutex(client, "/locks/lock_01"); lock.acquire();
核心原始碼:
建立一個臨時有序節點,然後獲取到自己在節點中的位置,如果是在第0位,則獲取鎖成功,把節點資訊放入ConcurrentMap<Thread, LockData> threadData這個map中。如果不是第0位,則wait等待,並註冊一個watch,對它上一個節點進行監控(監控上一個節點是否存在)。
private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) {
ourPath =client.create().creatingParentsIfNeeded().withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
asTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
釋放鎖主要邏輯:刪除當前持有鎖的臨時節點,並移除當前執行緒的LockData。
public void release() throws Exception
{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
try
{internals.releaseLock(lockData.lockPath);}
finally
{threadData.remove(currentThread);}
}