zookeeper-分散式鎖的程式碼實現-【每日五分鐘搞定大資料】
阿新 • • 發佈:2018-12-14
本文涉及到幾個zookeeper簡單的知識點,永久節點、有序節點、watch機制。比較基礎,熟悉的就別看了跳過這篇吧
- 每個執行緒在/locks節點下建立一個臨時有序節點test_lock_0000000040
- 獲得/locks節點下所有子節點A、B、C,排序獲得最小值
- 若當前節點B為最小值則獲得鎖,執行業務邏輯
- 若當前節點B不是最小值則watch比自己小1的節點A,節點A存在則await,否則獲得鎖
總結:臨時有序節點排序後watch比自己小1的節點。
下面看程式碼
1.執行緒初始化
建立一個名字為lockName的永久節點(只有永久節點才可以建立子節點)
DistributedLock(String url, String lockName) { this.lockName = lockName; try { zkConn = new ZooKeeper(url, sessionTimeout, this); if (zkConn.exists(ROOT_LOCK, false) == null) zkConn.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (IOException | InterruptedException | KeeperException e) { e.printStackTrace(); } }
複習一下:zookeeper的節點有以下幾種型別:永久,永久有序,臨時,臨時有序
public enum CreateMode {
PERSISTENT(0, false, false),
PERSISTENT_SEQUENTIAL(2, false, true),
EPHEMERAL(1, true, false),
EPHEMERAL_SEQUENTIAL(3, true, true);
2.執行緒嘗試獲得鎖tryLock
建立臨時有序節點,會在你的lockName後面加上一串編號,例如/locks/test_lock_0000000035
CURRENT_LOCK = zkConn.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
獲取當前lockName下的所有子臨時節點
List<String> subNodes = zkConn.getChildren(ROOT_LOCK, false);
把名字都取出來放進list排個序
List<String> lockObjects = new ArrayList<>();
for (String node : subNodes) {
String _node = node.split(splitStr)[0];
if (_node.equals(lockName)) lockObjects.add(node);
}
若當前節點為最小節點,則直接獲取鎖成功
if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
return true;
}
若不是最小節點,開始等待,見下一步。
3.開始等待鎖
獲得當前執行緒需要等待的節點名
- 獲得當前節點的節點名即test_lock_0000000035
- 獲得當前幾點在所有排隊等鎖的執行緒中的排序
- 根據排序-1,獲得排在自己前面的執行緒的節點名
String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
int prevNodePosition = Collections.binarySearch(lockObjects, prevNode);
WAIT_LOCK = lockObjects.get(prevNodePosition - 1);
給exists加上watcher,監控當前執行緒等待的節點waitLock是否還存在
若存在即stat不為null,當前執行緒先開始await
同時watch執行緒也啟動了開始監控exists操作
若是exists狀態有變化了(即waitLock不存在了)觸發watch執行緒的countDown操作。
countDown操作使當前執行緒結束waiting,獲得鎖,開始繼續往後執行
private boolean waitForLock(String waitLock, long waitTime) throws KeeperException, InterruptedException {
Stat stat = zkConn.exists(ROOT_LOCK + "/" + waitLock, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
});
if (stat != null) {
System.out.println(Thread.currentThread().getName() + " is waiting for " + ROOT_LOCK + "/" + waitLock);
this.countDownLatch = new CountDownLatch(1);
this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
System.out.println(Thread.currentThread().getName() + " get the lock ");
}
return true;
}
完整的程式碼可以在我的公眾號後臺回覆9獲得,感謝閱讀。