1. 程式人生 > >基於ZooKeeper原生API實現分散式鎖

基於ZooKeeper原生API實現分散式鎖

其實實現分散式鎖主要需要有一個第三方中介軟體能夠提供鎖的儲存和鎖的釋放。像資料庫、Redis、ZooKeeper都是常用的分散式鎖解決方案。

分析

根據ZK節點的特性,在同一時間內,只會有一個客戶端建立/Locks/lock節點成功,失敗的節點則會監聽/Locks/lock節點的變化:

當/Locks/lock發生變化後,會觸發事件的監聽機制,隨後ClientB和ClientA客戶端會重新再去爭搶這把鎖。這是一個比較簡單的實現分散式鎖的思想,但是這個會產生“驚群效應”,在併發量較高的情況下,也就是說短時間之內會有大量的客戶端去爭搶這把鎖,短時間內會發生大量的事件上下文變更,但是實際上只有一個客戶端可以搶到鎖,相當於出現了大量的無效的系統排程、上下文切換,系統系能大打折扣。

為了解決這個問題,我們可以利用ZooKeeper中有序節點的特性。每個客戶端都去/Locks節點下建立一個有序節點/Locks/lock_seq_000X,這樣每一個客戶端都與一個有序節點有了關聯關係。如果要獲得鎖的話,只需要從這些所有的有序節點中獲得一個最小的子節點,也就是說這個節點對應的客戶端可以獲得鎖。其他的次於最小節點的節點只監聽比自己小1的節點:

當一個節點發生變化,監聽它的節點能夠收到這樣一個變化。再去判斷當前節點是不是所有節點中最小的節點,如果是的就獲得鎖,不是的就繼續等待,直到上一個節點釋放。這樣就避免了驚群效應。

程式碼示例

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 擴充套件Lock介面實現分散式鎖
 *
 * @author Dongguabai
 * @date 2018/10/30 11:36
 */
@Slf4j
public class DistributedLock implements Lock, Watcher {

    private ZooKeeper zk = null;

    /**
     * 定義根節點
     */
    private String ROOT_LOCK = "/locks";

    /**
     * 表示等待前一個鎖
     */
    private String WAIT_LOCK;

    /**
     * 表示當前鎖
     */
    private String CURRENT_LOCK;

    /**
     * 主要用作控制
     */
    private CountDownLatch countDownLatch;

    public DistributedLock() {
        try {
            zk = new ZooKeeper("192.168.220.136,192.168.220.137", 4000, this);
            //為false就不去註冊當前的事件
            Stat stat = zk.exists(ROOT_LOCK, false);
            //判斷當前根節點是否存在
            if (stat == null) {
                //建立持久化節點
                zk.create(ROOT_LOCK, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            log.error("初始化分散式鎖異常!!", e);
        }
    }

    @Override
    public void lock() {
        if (tryLock()){
            //如果獲得鎖成功
            log.info(Thread.currentThread().getName()+"-->"+CURRENT_LOCK+"|獲得鎖成功!恭喜!");
            return;
        }
        //如果沒有獲得鎖,那麼就繼續監聽,等待獲得鎖
        try {
            waitForLock(WAIT_LOCK);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 持續阻塞獲得鎖的過程
     * @param prev 當前節點的前一個等待節點
     * @return
     */
    private boolean waitForLock(String prev) throws KeeperException, InterruptedException {
        //等待鎖需要監聽上一個節點,設定Watcher為true,即每一個有序節點都去監聽它的上一個節點
        Stat stat = zk.exists(prev,true);
        if (stat!=null){
            //即如果上一個節點依然存在的話
            log.info(Thread.currentThread().getName()+"-->等待鎖 "+ROOT_LOCK+"/"+prev+"釋放。");
            countDownLatch = new CountDownLatch(1);
            countDownLatch.await();
            log.info(Thread.currentThread().getName()+"-->"+"等待後獲得鎖成功!");
        }
        return true;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        try {
            //建立臨時有序節點(節點會自動遞增)-當前鎖
            CURRENT_LOCK = zk.create(ROOT_LOCK + "/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            log.info(Thread.currentThread().getName()+"-->"+CURRENT_LOCK+"|嘗試競爭鎖!");
            //獲取根節點下所有的子節點,不註冊監聽
            List<String> children = zk.getChildren(ROOT_LOCK, false);
            //排序
            SortedSet<String> sortedSet = new TreeSet<>();
            children.forEach(child->{
                sortedSet.add(ROOT_LOCK+"/"+child);
            });
            //獲取當前子節點中最小的節點
            String firstNode = sortedSet.first();
            if (StringUtils.equals(firstNode,CURRENT_LOCK)){
                //將當前節點和最小節點進行比較,如果相等,則獲得鎖成功
                return true;
            }
            //獲取當前節點中所有比自己更小的節點
            SortedSet<String> lessThenMe = sortedSet.headSet(CURRENT_LOCK);
            //如果當前所有節點中有比自己更小的節點
            if (CollectionUtils.isNotEmpty(lessThenMe)){
                //獲取比自己小的節點中的最後一個節點,設定為等待鎖
                WAIT_LOCK = lessThenMe.last();
            }
            return false;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        log.info(Thread.currentThread().getName()+"-->釋放鎖 "+CURRENT_LOCK);
        try {
            //-1強制刪除
            zk.delete(CURRENT_LOCK,-1);
            CURRENT_LOCK = null;
            zk.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @NotNull
    @Override
    public Condition newCondition() {
        return null;
    }

    /**
     * 監聽事件
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        if (this.countDownLatch!=null){
            //如果不為null說明存在這樣的監聽
            this.countDownLatch.countDown();
        }
    }
}

測試程式碼: