1. 程式人生 > >求你了,別再問我Zookeeper如何實現分散式鎖了!!!

求你了,別再問我Zookeeper如何實現分散式鎖了!!!

導讀

  • 真是有人()的地方就有江湖(事務),今天不談江湖,來撩撩人。

  • 分散式鎖的概念、為什麼使用分散式鎖,想必大家已經很清楚了。前段時間作者寫過Redis是如何實現分散式鎖,今天這篇文章來談談Zookeeper是如何實現分散式鎖的。

  • 陳某今天分別從如下幾個方面來詳細講講ZK如何實現分散式鎖:

    1. ZK的四種節點

    2. 排它鎖的實現

    3. 讀寫鎖的實現

    4. Curator實現分步式鎖

ZK的四種節點

  • 永續性節點:節點建立後將會一直存在

  • 臨時節點:臨時節點的生命週期和當前會話繫結,一旦當前會話斷開臨時節點也會刪除,當然可以主動刪除。

  • 持久有序節點:節點建立一直存在,並且zk會自動為節點加上一個自增的字尾作為新的節點名稱。

  • 臨時有序節點:保留臨時節點的特性,並且zk會自動為節點加上一個自增的字尾作為新的節點名稱。

 

排它鎖的實現

  • 排他鎖的實現相對簡單一點,利用了zk的建立節點不能重名的特性。如下圖:

  • 根據上圖分析大致分為如下步驟:

    1. 嘗試獲取鎖:建立臨時節點,zk會保證只有一個客戶端建立成功。

    2. 建立臨時節點成功,獲取鎖成功,執行業務邏輯,業務執行完成後刪除鎖。

    3. 建立臨時節點失敗,阻塞等待。

    4. 監聽刪除事件,一旦臨時節點刪除了,表示互斥操作完成了,可以再次嘗試獲取鎖。

    5. 遞迴:獲取鎖的過程是一個遞迴的操作,獲取鎖->監聽->獲取鎖

  • 如何避免死鎖:建立的是臨時節點,當服務宕機會話關閉後臨時節點將會被刪除,鎖自動釋放。

程式碼實現

  • 作者參照JDK鎖的實現方式加上模板方法模式的封裝,封裝介面如下:

/**
 * @Description ZK分散式鎖的介面
 * @Author 陳某
 * @Date 2020/4/7 22:52
 */
public interface ZKLock {
    /**
     * 獲取鎖
     */
    void lock() throws Exception;
​
    /**
     * 解鎖
     */
    void unlock() throws Exception;
}

 

  • 模板抽象類如下:

/**
 * @Description 排他鎖,模板類
 * @Author 陳某
 * @Date 2020/4/7 22:55
 */
public abstract class AbstractZKLockMutex implements ZKLock {
​
    /**
     * 節點路徑
     */
    protected String lockPath;
​
    /**
     * zk客戶端
     */
    protected CuratorFramework zkClient;
​
    private AbstractZKLockMutex(){}
​
    public AbstractZKLockMutex(String lockPath,CuratorFramework client){
        this.lockPath=lockPath;
        this.zkClient=client;
    }
​
    /**
     * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現
     * @throws Exception
     */
    @Override
    public final void lock() throws Exception {
        //獲取鎖成功
        if (tryLock()){
            System.out.println(Thread.currentThread().getName()+"獲取鎖成功");
        }else{  //獲取鎖失敗
            //阻塞一直等待
            waitLock();
            //遞迴,再次獲取鎖
            lock();
        }
    }
​
    /**
     * 嘗試獲取鎖,子類實現
     */
    protected abstract boolean tryLock() ;
​
​
    /**
     * 等待獲取鎖,子類實現
     */
    protected abstract void waitLock() throws Exception;
​
​
    /**
     * 解鎖:刪除節點或者直接斷開連線
     */
    @Override
    public  abstract void unlock() throws Exception;
}

 

  • 排他鎖的具體實現類如下:

/**
 * @Description 排他鎖的實現類,繼承模板類 AbstractZKLockMutex
 * @Author 陳某
 * @Date 2020/4/7 23:23
 */
@Data
public class ZKLockMutex extends AbstractZKLockMutex {
​
    /**
     * 用於實現執行緒阻塞
     */
    private CountDownLatch countDownLatch;
​
    public ZKLockMutex(String lockPath,CuratorFramework zkClient){
        super(lockPath,zkClient);
    }
​
    /**
     * 嘗試獲取鎖:直接建立一個臨時節點,如果這個節點存在建立失敗丟擲異常,表示已經互斥了,
     * 反之建立成功
     * @throws Exception
     */
    @Override
    protected boolean tryLock()  {
        try {
            zkClient.create()
                    //臨時節點
                    .withMode(CreateMode.EPHEMERAL)
                    //許可權列表 world:anyone:crdwa
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(lockPath,"lock".getBytes());
            return true;
        }catch (Exception ex){
            return false;
        }
    }
​
​
    /**
     * 等待鎖,一直阻塞監聽
     * @return  成功獲取鎖返回true,反之返回false
     */
    @Override
    protected void waitLock() throws Exception {
        //監聽節點的新增、更新、刪除
        final NodeCache nodeCache = new NodeCache(zkClient, lockPath);
        //啟動監聽
        nodeCache.start();
        ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();
​
        //監聽器
        NodeCacheListener listener=()-> {
            //節點被刪除,此時獲取鎖
            if (nodeCache.getCurrentData() == null) {
                //countDownLatch不為null,表示節點存在,此時監聽到節點刪除了,因此-1
                if (countDownLatch != null)
                    countDownLatch.countDown();
            }
        };
        //新增監聽器
        listenable.addListener(listener);
​
        //判斷節點是否存在
        Stat stat = zkClient.checkExists().forPath(lockPath);
        //節點存在
        if (stat!=null){
            countDownLatch=new CountDownLatch(1);
            //阻塞主執行緒,監聽
            countDownLatch.await();
        }
        //移除監聽器
        listenable.removeListener(listener);
    }
​
    /**
     * 解鎖,直接刪除節點
     * @throws Exception
     */
    @Override
    public void unlock() throws Exception {
        zkClient.delete().forPath(lockPath);
    }
}

 

可重入性排他鎖如何設計

  • 可重入的邏輯很簡單,在本地儲存一個ConcurrentMapkey是當前執行緒,value是定義的資料,結構如下:

 private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

 

  • 重入的虛擬碼如下:

public boolean tryLock(){
    //判斷當前執行緒是否在threadData儲存過
    //存在,直接return true
    //不存在執行獲取鎖的邏輯
    //獲取成功儲存在threadData中
}

 

 

讀寫鎖的實現

  • 讀寫鎖分為讀鎖和寫鎖,區別如下:

    • 讀鎖允許多個執行緒同時讀資料,但是在讀的同時不允許寫執行緒修改。

    • 寫鎖在獲取後,不允許多個執行緒同時寫或者讀。

  • 如何實現讀寫鎖?ZK中有一類節點叫臨時有序節點,上文有介紹。下面我們來利用臨時有序節點來實現讀寫鎖的功能。

 

讀鎖的設計

  • 讀鎖允許多個執行緒同時進行讀,並且在讀的同時不允許執行緒進行寫操作,實現原理如下圖:

  • 根據上圖,獲取一個讀鎖分為以下步驟:

    1. 建立臨時有序節點(當前執行緒擁有的讀鎖或稱作讀節點)。

    2. 獲取路徑下所有的子節點,並進行從小到大排序

    3. 獲取當前節點前的臨近寫節點(寫鎖)。

    4. 如果不存在的臨近寫節點,則成功獲取讀鎖。

    5. 如果存在臨近寫節點,對其監聽刪除事件。

    6. 一旦監聽到刪除事件,重複2,3,4,5的步驟(遞迴)。

 

寫鎖的設計

  • 執行緒一旦獲取了寫鎖,不允許其他執行緒讀和寫。實現原理如下:

 

  • 從上圖可以看出唯一和寫鎖不同的就是監聽的節點,這裡是監聽臨近節點(讀節點或者寫節點),讀鎖只需要監聽寫節點,步驟如下:

    1. 建立臨時有序節點(當前執行緒擁有的寫鎖或稱作寫節點)。

    2. 獲取路徑下的所有子節點,並進行從小到大排序。

    3. 獲取當前節點的臨近節點(讀節點和寫節點)。

    4. 如果不存在臨近節點,則成功獲取鎖。

    5. 如果存在臨近節點,對其進行監聽刪除事件。

    6. 一旦監聽到刪除事件,重複2,3,4,5的步驟(遞迴)。

 

如何監聽

  • 無論是寫鎖還是讀鎖都需要監聽前面的節點,不同的是讀鎖只監聽臨近的寫節點,寫鎖是監聽臨近的所有節點,抽象出來看其實是一種鏈式的監聽,如下圖:

  • 每一個節點都在監聽前面的臨近節點,一旦前面一個節點刪除了,再從新排序後監聽前面的節點,這樣遞迴下去。

 

程式碼實現

  • 作者簡單的寫了讀寫鎖的實現,先造出來再優化,不建議用在生產環境。程式碼如下:

public class ZKLockRW  {
​
    /**
     * 節點路徑
     */
    protected String lockPath;
​
    /**
     * zk客戶端
     */
    protected CuratorFramework zkClient;
​
    /**
     * 用於阻塞執行緒
     */
    private CountDownLatch countDownLatch=new CountDownLatch(1);
​
​
    private final static String WRITE_NAME="_W_LOCK";
​
    private final static String READ_NAME="_R_LOCK";
​
​
    public ZKLockRW(String lockPath, CuratorFramework client) {
        this.lockPath=lockPath;
        this.zkClient=client;
    }
​
    /**
     * 獲取鎖,如果獲取失敗一直阻塞
     * @throws Exception
     */
    public void lock() throws Exception {
        //建立節點
        String node = createNode();
        //阻塞等待獲取鎖
        tryLock(node);
        countDownLatch.await();
    }
​
    /**
     * 建立臨時有序節點
     * @return
     * @throws Exception
     */
    private String createNode() throws Exception {
        //建立臨時有序節點
       return zkClient.create()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(lockPath);
    }
​
    /**
     * 獲取寫鎖
     * @return
     */
    public  ZKLockRW writeLock(){
        return new ZKLockRW(lockPath+WRITE_NAME,zkClient);
    }
​
    /**
     * 獲取讀鎖
     * @return
     */
    public  ZKLockRW readLock(){
        return new ZKLockRW(lockPath+READ_NAME,zkClient);
    }
​
    private void tryLock(String nodePath) throws Exception {
        //獲取所有的子節點
        List<String> childPaths = zkClient.getChildren()
                .forPath("/")
                .stream().sorted().map(o->"/"+o).collect(Collectors.toList());
​
​
        //第一個節點就是當前的鎖,直接獲取鎖。遞迴結束的條件
        if (nodePath.equals(childPaths.get(0))){
            countDownLatch.countDown();
            return;
        }
​
        //1. 讀鎖:監聽最前面的寫鎖,寫鎖釋放了,自然能夠讀了
        if (nodePath.contains(READ_NAME)){
            //查詢臨近的寫鎖
            String preNode = getNearWriteNode(childPaths, childPaths.indexOf(nodePath));
            if (preNode==null){
                countDownLatch.countDown();
                return;
            }
            NodeCache nodeCache=new NodeCache(zkClient,preNode);
            nodeCache.start();
            ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();
            listenable.addListener(() -> {
                //節點刪除事件
                if (nodeCache.getCurrentData()==null){
                    //繼續監聽前一個節點
                    String nearWriteNode = getNearWriteNode(childPaths, childPaths.indexOf(preNode));
                    if (nearWriteNode==null){
                        countDownLatch.countDown();
                        return;
                    }
                    tryLock(nearWriteNode);
                }
            });
        }
​
        //如果是寫鎖,前面無論是什麼鎖都不能讀,直接迴圈監聽上一個節點即可,直到前面無鎖
        if (nodePath.contains(WRITE_NAME)){
            String preNode = childPaths.get(childPaths.indexOf(nodePath) - 1);
            NodeCache nodeCache=new NodeCache(zkClient,preNode);
            nodeCache.start();
            ListenerContainer<NodeCacheListener> listenable = nodeCache.getListenable();
            listenable.addListener(() -> {
                //節點刪除事件
                if (nodeCache.getCurrentData()==null){
                    //繼續監聽前一個節點
                    tryLock(childPaths.get(childPaths.indexOf(preNode) - 1<0?0:childPaths.indexOf(preNode) - 1));
                }
            });
        }
    }
​
    /**
     * 查詢臨近的寫節點
     * @param childPath 全部的子節點
     * @param index 右邊界
     * @return
     */
    private String  getNearWriteNode(List<String> childPath,Integer index){
        for (int i = 0; i < index; i++) {
            String node = childPath.get(i);
            if (node.contains(WRITE_NAME))
                return node;
​
        }
        return null;
    }
​
}

 

Curator實現分步式鎖

  • Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。

  • Curator在分散式鎖方面已經為我們封裝好了,大致實現的思路就是按照作者上述的思路實現的。中小型網際網路公司還是建議直接使用框架封裝好的,畢竟穩定,有些大型的互聯公司都是手寫的,牛逼啊。

  • 建立一個排他鎖很簡單,如下:

//arg1:CuratorFramework連線物件,arg2:節點路徑
lock=new InterProcessMutex(client,path);
//獲取鎖
lock.acquire();
//釋放鎖
lock.release();

 

  • 更多的API請參照官方文件,不是此篇文章重點。

  • 至此ZK實現分散式鎖就介紹完了,如有想要原始碼的朋友,老規矩,關注微信公眾號【碼猿技術專欄】,回覆關鍵詞分散式鎖獲取。

一點小福利

  • 對於Zookeeper不太熟悉的朋友,陳某特地花費兩天時間總結了ZK的常用知識點,包括ZK常用shell命令、ZK許可權控制、Curator的基本操作API。目錄如下:

  • 需要上面PDF檔案的朋友,老規矩,關注微信公眾號【碼猿技術專欄】回覆關鍵詞ZK總結