1. 程式人生 > >Zookeeper 實現分散式鎖(樂觀和悲觀)

Zookeeper 實現分散式鎖(樂觀和悲觀)

說明:
做備忘用,大家之言彙總到一起。
Jar

<!-- zkclient依賴 -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

這裡寫圖片描述

zookeeper基礎鞏固

ZooKeeper 節點是有生命週期的,這取決於節點的型別。在 ZooKeeper 中,節點型別可以分為持久節點(PERSISTENT )、臨時節點(EPHEMERAL),以及時序節點(SEQUENTIAL ),具體在節點建立過程中,一般是組合使用,可以生成以下 4 種節點型別。

持久節點(PERSISTENT)

所謂持久節點,是指在節點建立後,就一直存在,直到有刪除操作來主動清除這個節點——不會因為建立該節點的客戶端會話失效而消失。

持久順序節點(PERSISTENT_SEQUENTIAL)

這類節點的基本特性和上面的節點型別是一致的。額外的特性是,在ZK中,每個父節點會為他的第一級子節點維護一份時序,會記錄每個子節點建立的先後順序。基於這個特性,在建立子節點的時候,可以設定這個屬性,那麼在建立節點過程中,ZK會自動為給定節點名加上一個數字字尾,作為新的節點名。這個數字字尾的範圍是整型的最大值。

臨時節點(EPHEMERAL)

和持久節點不同的是,臨時節點的生命週期和客戶端會話繫結。也就是說,如果客戶端會話失效,那麼這個節點就會自動被清除掉。注意,這裡提到的是會話失效,而非連線斷開。另外,在臨時節點下面不能建立子節點。

臨時順序節點(EPHEMERAL_SEQUENTIAL)

可以用來實現分散式鎖

這裡寫圖片描述

程式碼

業務程式碼-模擬併發下生成id

package com.dongnao.lock;

import java.text.SimpleDateFormat;
import java.util.Date;

public class OrderCodeGenerator {

    //自增長序列
    private static int i =0;

    //按照“年-月-日-小時-分鐘-秒-自增長序列”的規則生成訂單編號
    public String getOrderCode
(){ Date now = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-"); return sdf.format(now)+ ++i; } public static void main(String[] args) { OrderCodeGenerator ong = new OrderCodeGenerator(); for (int i = 0; i < 10; i++) { System.out.println(ong.getOrderCode()); } } }

模擬100個執行緒去建立訂單id

程式碼說明:這裡我們用的java的發令槍來模擬併發CountDownLatch ,主函式執行 所有的執行緒都處於阻塞狀態 cdl.await();當 cdl.countDown();執行之後,所有執行緒開始併發執行 createOrder() ; 該方法中會用到 lock.lock(); 該lock 物件我們提供了三種例項,方式1是java自帶的,非分散式的。方式2,3是我們利用zookeeper 來實現的,這裡會貼出 方式2,3的具體程式碼,也會對比著去分析方式3差在那裡,如何優化到方式2這用利用zookeeper來實現分散式鎖,進而投入生產。

package com.dongnao.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrderServiceImpl implements Runnable {

    private static OrderCodeGenerator ong = new OrderCodeGenerator();

    private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);

    private static final int NUM = 100;
    // 按照執行緒數初始化倒計數器
    private static CountDownLatch cdl = new CountDownLatch(NUM);

//  private static Lock lock = new ReentrantLock();  加鎖方式1
//  private Lock lock = new ZookeeperImproveLock();  加鎖方式2 
    private Lock lock = new ZookeeperLock();         加鎖方式3

    // 建立訂單介面
    public void createOrder() {
        String orderCode = null;
        lock.lock();
        try {
            // 獲取訂單編號
            orderCode = ong.getOrderCode();
        } catch (Exception e) {
            // TODO: handle exception
        }finally{
            lock.unlock();
        }


        // ……業務程式碼,此處省略100行程式碼

        logger.info(Thread.currentThread().getName()
                + " =======================>" + orderCode);
    }

    @Override
    public void run() {
        try {
            // 等待其他執行緒初始化
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // 建立訂單
        createOrder();
    }

    public static void main(String[] args) {
        for (int i = 1; i <= NUM; i++) {
            // 按照執行緒數迭代例項化執行緒
            new Thread(new OrderServiceImpl()).start();
            // 建立一個執行緒,倒計數器減1
            cdl.countDown();
        }
    }

}

lock 物件 方式3

lock()方法是呼叫的入口,它去呼叫tryLock() 嘗試獲取鎖和阻塞其他執行緒,tryLock()中去建立持久節點LOCK,之前介紹過,持久節點只能有一個,所以其他執行緒去建立的時候,會丟擲ZkNodeExistsException 異常,tryLock()是非阻塞的,捕獲異常我們返回false, 在 lock() 中呼叫waitForLock(); 去阻塞執行緒和對LOCK節點的監聽,當鎖釋放了,繼續呼叫 lock(); 再去競爭鎖(遞迴)。

package com.dongnao.lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperLock implements Lock {

    private static final String ZK_IP_PROT = "localhost:2181";
//  private static final String ZK_IP_PROT = "13.206.6.232:2181";
    private static final String LOCK_NODE = "/LOCK";

    private ZkClient client = new ZkClient(ZK_IP_PROT);

    private static final Logger logger = LoggerFactory.getLogger(ZookeeperLock.class);

    private CountDownLatch cdl=null;


    @Override
    //阻塞的方式去獲取鎖
    public void lock() {
        if(tryLock()){
            logger.info("=============get lock success==============");
        }else{
            waitForLock();
            lock();
        }

    }


    @Override
    //通過新建節點的方式去嘗試加鎖  非阻塞
    public boolean tryLock() {
        try {
            client.createPersistent(LOCK_NODE);
            return true;
        } catch (ZkNodeExistsException e) {
            return false;
        }
    }


    @Override
    public void unlock() {
        client.delete(LOCK_NODE);
    }

    private void waitForLock() {
        //1.建立一個監聽
        IZkDataListener listener = new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                //3.當其他的執行緒釋放鎖,丟擲事件,讓其他執行緒重新競爭鎖
                logger.info("=============catch data delete event==============");
                if(cdl!=null){
                    cdl.countDown();
                }
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
             // TODO Auto-generated method stub
            }
        };
        client.subscribeDataChanges(LOCK_NODE, listener);
        //2.如果節點還存在,讓執行緒阻塞
        if(client.exists(LOCK_NODE)){
            cdl = new CountDownLatch(1);
            try {
                cdl.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        client.unsubscribeDataChanges(LOCK_NODE, listener);

    }

    public static void main(String[] args) throws InterruptedException {

        final CountDownLatch cdl = new CountDownLatch(1);
        ZkClient client = new ZkClient(ZK_IP_PROT);
        client.subscribeDataChanges(LOCK_NODE, new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("===============aaa===========");
                cdl.countDown();
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {   
            }
        });
        cdl.await();
    }


    //--------------------不需要寫邏輯的方法--------------------

    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit)
            throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }

}

測試方式3的弊端
方式3不建議投入生產,弊端有兩個:(1)會出現死鎖。(2)基於zookeeper內部機制,所有產生連線的客戶端,當節點LOCK 刪除之後,zookeeper回給所有的客戶端傳送 刪除通知,這嚴重的影響了我們的效能。如果我們有100個客戶端,當拿到鎖的 執行緒去釋放鎖(刪除該節點)之後,zookeeper會通過http 告訴99個客戶端該節點刪除了。

測試步驟1

用命令 去建立一個LOCK節點,然後就會死鎖。因為對於這100個人來說,他們建立這個節點時發現已經存在了,它會拋異常,捕獲異常之後他們都會阻塞,沒有執行緒會去刪除這個節點,此時100個人永久等待。

這裡寫圖片描述

執行主函式
這裡寫圖片描述

啟動一堆執行緒之後,發現所有執行緒都是在阻塞

這裡寫圖片描述

同理:當一個執行緒建立這個節點之後,伺服器宕機了,網路延遲等導致這個LOCK 節點 沒有合理性的釋放,其他執行緒死鎖。

步驟二,我們命令刪除LOCK來測試第二個弊端

因為,我們在程式碼裡 寫了對 LOCK 節點的監聽client.subscribeDataChanges(LOCK_NODE, listener);所以命令刪除之後,100執行緒正常的去搶佔鎖資源,一切程式恢復正常。如圖

這裡寫圖片描述

這裡寫圖片描述

上圖我們會發現: 肉眼可見的所有執行緒在搶鎖,很慢,而且每次釋放鎖(刪除節點),會有 n - 1次通知,n 為當前最大執行緒個數。

方式二解決了以上兩個弊端

(1)我們用臨時節點,這樣就不會死鎖。
(2)我們每個執行緒只監聽他的上一個節點(排序),這樣通知就變為了1 。

這裡寫圖片描述

程式碼

package com.dongnao.lock;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperImproveLock implements Lock {

    private static final String LOCK_PATH = "/LOCK";

    private static final String ZOOKEEPER_IP_PORT = "localhost:2181";

    private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 1000, 1000, new SerializableSerializer());

    private static Logger logger = LoggerFactory.getLogger(ZookeeperImproveLock.class);

    private CountDownLatch cdl;

    private String beforePath;// 當前請求的節點
    private String currentPath;// 當前請求的節點前一個節點

    // 判斷有沒有LOCK目錄,沒有則建立
    public ZookeeperImproveLock() {
        if (!this.client.exists(LOCK_PATH)) {
            this.client.createPersistent(LOCK_PATH);
        }
    }

    public void lock() {
        if (!tryLock()) {
            waitForLock();
            lock();
        } else {
            logger.info(Thread.currentThread().getName() + " 獲得分散式鎖!");
        }

    }

    /**
     * 為當前節點新增 監聽器
     */
    private void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {

            // 刪除的時候去監聽
            public void handleDataDeleted(String dataPath) throws Exception {
                logger.info(Thread.currentThread().getName() + ":捕獲到DataDelete事件!---------------------------");
                if (cdl != null) {
                    cdl.countDown();
                }
            }

            // 發生改變的時候去監聽
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };
        // 給之前的節點增加資料刪除的watcher
        this.client.subscribeDataChanges(beforePath, listener);

        if (this.client.exists(beforePath)) { // 如果這個節點存在
            cdl = new CountDownLatch(1);
            try {
                cdl.await(); // 執行緒就給他阻塞,讓他等
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.client.unsubscribeDataChanges(beforePath, listener);

    }

    public boolean tryLock() {
        // 如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath
        if (currentPath == null || currentPath.length() <= 0) {
            // 建立一個臨時順序節點
            currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
            System.out.println("---------------------------->" + currentPath);
        }
        // 獲取所有臨時節點並排序,臨時節點名稱為自增長的字串如:0000000400
        List<String> childrens = this.client.getChildren(LOCK_PATH);
        Collections.sort(childrens);

        if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {// 如果當前節點在所有節點中排名第一則獲取鎖成功
            return true;
        } else {// 如果當前節點在所有節點中排名中不是排名第一,則獲取前面的節點名稱,並賦值給beforePath
            int wz = Collections.binarySearch(childrens, currentPath.substring(6));
            beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
        }
        return false;
    }

    public void unlock() {
        // 刪除當前臨時節點
        client.delete(currentPath);

    }

    // ===================用不到的實現方法=======================

    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub

    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }

    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

}

執行效果:

LOKC下建立有序號的 值

這裡寫圖片描述

這裡寫圖片描述

上圖發現秒級搶鎖, 方式2的程式碼就不介紹了,主要就是:獲取該節點下所有的 值,然後排序,取第一個,100個執行緒都有自己的編號,然後跟排序完第一個equals() 比較,肯定只有一個能批對上,其他99個去阻塞等~,再就是監聽上一個節點來保證通知只會發生一次保證效能。

基於Zookeeper實現分散式鎖 已經沒有問題了。多說一嘴,我們上面是通過java的發令槍CountDownLatch 來進行阻塞,實現悲觀鎖,如果我們不阻塞可以實現樂觀鎖。

補充:

該異常是因為客戶端和伺服器部署的zookeeper版本不相容導致,上面介紹過,我們的客戶端支援3.4.8 一下版本的 zookeeper, 以上版本就會出現如下異常。

這裡寫圖片描述