1. 程式人生 > >【zookeeper】Apache curator的使用及zk分散式鎖實現

【zookeeper】Apache curator的使用及zk分散式鎖實現

接上篇,本篇主要講Apache開源的curator的使用,有了curator,利用Java對zookeeper的操作變得極度便捷.

其實在學之前我也有個疑慮,我為啥要學curator,撇開漲薪這些外在的東西,就單技術層面來講,學curator能幫我做些什麼?這就不得不從zookeeper說起,上篇我已經大篇幅講了zk是做什麼的了,但真正要靠zk去實現多伺服器自動拉取更新的配置檔案等功能是非常難的,如果沒有curator,直接去寫的話基本上能把你累哭,就好比連Mybatis或者jpa都沒有,讓你用原生的程式碼去寫個網站一樣,你可以把curator當做一個比較強大的工具,有了它操作zk不再是事,說這麼多,是時候進入正題了:

curator 官網:http://curator.apache.org

使用curator去實現的幾塊內容:


學習目錄:
1.使用curator建立與zk的連線
2.使用curator新增/遞迴新增節點
3.使用curator刪除/遞迴刪除節點
4.使用curator建立/驗證 ACL(訪問許可權列表)
5.使用curator監聽 單個/父 節點的變化(watch事件)
---------------------------------------------
6.基於curator實現zookeeper分散式鎖(需要掌握基本的多執行緒知識)

前置條件:已掌握zookeeper的基本操作,對zookeeper有所瞭解,如果沒有掌握請翻閱我前面的部落格去學習.

本節所需要引入的依賴有以下三個,建議直接全部引入即可:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

1.通過curator建立與zk的連線

需要準備連線zk的url,建議直接寫成工具類,因為接下來會頻繁用到,功能類似於jdbc.

public class ZkClientUtil {
    private static final int BASE_SLEEP_TIME_MS = 5000; //定義失敗重試間隔時間 單位:毫秒
    private static final int MAX_RETRIES = 3; //定義失敗重試次數
    private static final int SESSION_TIME_OUT = 1000000; //定義會話存活時間,根據業務靈活指定 單位:毫秒
    private static final String ZK_URI = "192.168.174.132:2181";//你自己的zkurl和埠號
    private static final String NAMESPACE = "laohan_jianshen";
    //工作空間,可以不指定,建議指定,功能類似於專案包,之後建立的所有的節點都會在該工作空間下,方便管理
    
    public static CuratorFramework build(){
    //建立比較簡單,鏈式程式設計,很爽,基本上指定點引數就OK了
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS,MAX_RETRIES);//重試策略
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString(ZK_URI)
                .retryPolicy(retryPolicy)
                .namespace(NAMESPACE)
                .sessionTimeoutMs(SESSION_TIME_OUT)
                .build();
        return client;
    }
}

2.通過curator新增/遞迴新增節點

//通過上一步獲取到的client,直接啟動該client,值得注意的是client在使用前必須先啟動:
client.start;
client
.create()//建立節點
.withMode(CreateMode.xxx)//節點屬性:永久節點/臨時節點/有序節點 通過CreateMode.即可看到
.withACL(ZooDefs.Ids.xxx)//節點訪問許可權,通過Ids.即可看到 預設是OPEN_ACL_UNSAFE(開放不安全許可權)
.forPath("/yourpath","yourdata".getBytes());//指明你的節點路徑,資料可以不指定,資料必須是byte[]

建立遞迴節點:

//比如我想一次性建立/yourpath/a/b/c/1/2/3...這樣的節點,如果按傳統方法會累死你
//curator可以一次性建立好,只需要在建立時新增creatingParentsIfNeeded即可.
client
.create()//建立節點
.creatingParentsIfNeeded()//建立父節點,如果需要的話

...

3.使用curator刪除/遞迴刪除節點

client
.delete() //刪除
.guaranteed()//保證一定幫你刪了它
.withVersion(0)//指定要刪節點的版本號
.forPath("/yourpath")//指定要刪節點的路徑

遞迴刪除:

//比如我當前的節點結構是這樣:/yourpath/a/b/c/1/2/3  我想刪除a節點下面的所有目錄
//傳統方法累死個人,現在只需要新增deletingChildrenIfNeeded即可
client
.delete() //刪除
.deletingChildrenIfNeeded()//如果它有兒子都給刪了...

4.使用curator建立/驗證 ACL(訪問許可權列表)

//為了保證安全,有時需要對節點的訪問許可權做一些限制,否則可能會引起重要資訊洩露/篡改/刪除等
//節點ACL的建立方式有兩種,一種是使用ZK提供的,一種是自定義的
//1.ZK提供的,比較簡單,拿來即用,在建立節點時指明withACL即可
client
.create()
.withACL(ZooDefs.Ids.READ_ACL_UNSAFE)//指明該節點是隻讀節點,還有其他屬性,可以通過Ids.檢視
//建立自定義ACL,需要自己new Id(),並指明是否是加密的,然後賬號和密碼是多少,加密策略使用zk提供的:
List<ACL> aclList = new ArrayList<ACL>();
ACL acl1 = new ACL(ZooDefs.Perms.READ,new Id("digest",DigestAuthenticationProvider.generateDigest("user:123456")));
ACL acl2 = new ACL(ZooDefs.Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("root:123456")));
aclList.add(acl1);
aclList.add(acl2);
//如此我就建立好了兩種不同的許可權賬號,user只能對該節點有讀的許可權,但root使用者對該節點有所有許可權
//ACL驗證,建立好節點之後,可以在伺服器的zk安裝目錄的bin目錄下 連線客戶端./zkCli
//然後通過ls /該目錄  檢視是否可以訪問 正常是不能訪問的 會提示許可權不夠
//下面我們通過curator去連線,要想訪問該節點需要在建立client時就指明賬號和密碼:
CuratorFramework client = CuratorFrameworkFactory
.builder()
.authorization("digest","root:123456".getBytes())//指明使用了加密,使用者名稱和密碼用:隔開,以byte[]輸入
//如此,接下來通過該client可以對剛剛建立的節點具有所有許可權,如果登入的是user,則只具有讀許可權.

5.通過curator建立單個節點及其父節點的watch事件

由於zk的watch事件是隻能被觸發一次的,觸發完即銷燬監聽,這顯然不是我們想要的,在實際開發中更多的場景是需要對某個節點持續監聽,所以這裡我只介紹建立持續監聽的單節點/父節點

//對單個節點建立watch事件
//定義NodeCache,指明被監聽節點的路徑:
final NodeCache nodeCache = new NodeCache(client,"/yourpath");
nodeCache.start(true);//開啟
nodeCache
.getCurrentData()//可以獲取該監聽節點的資料
.getPath();//可以獲取該監聽節點的路徑

//對指定父節點建立watch事件,只要其任何一個子節點,或子節點的子節點...發生變化,就會觸發watch事件.
//定義PathChildrenCache,指明要watch的目錄
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"yourpath");
//啟動,啟動策略有三種:同步,非同步提交,非同步 用的比較多的就是下面這種,用StartMode.可以檢視到
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
//對該節點建立監聽器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    //TODO 可以通過PathChildrenCacheEvent.拿到你想要的資料和路徑等
    }
});

至此,curator的常用內容已學習完畢,建議每個都親自操作一下,為之後的自動配置和分散式鎖操作打下基礎.

 


6.基於curator實現zookeeper分散式鎖

先來了解下分散式鎖應用場景:

比如我有一個電商商城,為了提高系統的服務量,業務被拆分小了,分別部署在不同的伺服器上,下單成功後訂單系統通知庫存系統要及時減庫存,但此時併發極高,庫存系統還沒來得及減庫存,又有新的人進來了,讀取了庫存,資料被髒讀了,於是他也下單成功了,但發貨的時候發現庫存不夠了,於是鬧得兩家都不開心...這時候就需要分散式鎖來解決.

為了演示這樣的場景,我寫了個小專案來模擬這種場景:

我資料庫裡電腦庫存只有10臺,然後寫了兩個下單頁面,每個訂單買8臺電腦,讓執行緒適當休眠來模擬高併發下的延遲,於是我幾乎同時訪問了這兩個下單頁面,最後兩邊都提示我下單成功了,但資料庫裡的庫存數量變成了-6

 

這顯然不是我想要的結果,正確的應該是,這兩個人裡只有一個人下單成功,另外一個人下單失敗,提示庫存不足,最後資料庫裡剩2臺電腦庫存.

為了解決這個問題,就必須讓第一個人下單時,第二個人不能下單,只能等到第一個人下單完成後方可下單,或者第二個人下單時,第一個人不能下單,只能等到第二人下單完成方可下單,由於兩套系統是分開部署的,不能像以前那樣用同步鎖/同步程式碼塊Synchronized來解決了,這個時候就需要引出分散式鎖,分散式鎖可以用Redis或者zookeeper等實現,這篇主要講一下用zk去實現.

思路:提供一把全域性的鎖,所有來購買的請求競爭這一把鎖,誰先拿到這把鎖,誰就有資格執行下單,沒搶到鎖的請求被掛起,等待有鎖的請求完成下單後釋放鎖,然後喚醒被掛起的請求繼續去競爭這把鎖...

可以把這把鎖當做是zk上的一個節點,所有請求發起時,建立該節點,第一個建立該節點成功的請求就意味著獲得了鎖,其他請求建立都會丟擲異常,然後捕獲該異常,用全域性的countDownLatch將該請求掛起,等獲得鎖的節點完成下單後,把該節點刪除(釋放鎖),然後計數器-1,把掛起的執行緒都喚醒,繼續去競爭該鎖...

下面就順著這個思路一起去實現分散式鎖:

這裡預設使用上面已經寫好的連線ZK的工具類來建立client.

public class  ZkLockUtil {
    //分散式鎖,用於掛起當前執行緒,等待上一把分散式鎖釋放
    private static CountDownLatch DISTRIBUTE_LOCK = new CountDownLatch(1);
    //分散式鎖的總結點名
    private final static String ZK_LOCK_PROJECT = "zk-lock";
    //分散式鎖節點名
    private final static String DISTRIBUTE_LOCK_NAME = "distribute-lock";
    /**
     * 獲取分散式鎖
     */
    public static void getLock() {
        CuratorFramework client = ZkClientUtil.build();
        client.start();
        while (true) {
            try { 
                   client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/" + ZK_LOCK_PROJECT + "/" + DISTRIBUTE_LOCK_NAME);
                System.out.println("獲取分散式鎖成功...");
                return;
            } catch (Exception e) {
                try {
                    //如果沒有獲取到鎖,需要重新設定同步資源值
                    if (DISTRIBUTE_LOCK.getCount() <= 0) {
                        DISTRIBUTE_LOCK = new CountDownLatch(1);
                    }
                    System.out.println("獲取分散式鎖失敗,等待他人釋放鎖中...");
                    DISTRIBUTE_LOCK.await();
                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }
    }

    /**
     * 釋放鎖資源
     */
    public static void  release(String path) {
        CuratorFramework client = ZkClientUtil.build();
        client.start();
        try {
            client.delete().forPath(path);
            System.out.println("鎖釋放成功...");
        } catch (Exception e) {
            System.out.println("釋放鎖失敗...");
            e.printStackTrace();
        } finally {
            client.close();
        }
    }

    /**
     * 為指定路徑節點建立watch,觀察鎖狀態
     */
    public static void addWatcher2Path(final String path) throws Exception {
        CuratorFramework client = ZkClientUtil.build();
        client.start();
        final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        System.out.println("建立觀察者成功...");
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String nodePath = pathChildrenCacheEvent.getData().getPath();
                    System.out.println("上一會話已釋放鎖或會話已斷開...節點路徑為:" + nodePath);
                    if (nodePath.contains(DISTRIBUTE_LOCK_NAME)) {
                        DISTRIBUTE_LOCK.countDown();
                        System.out.println("釋放計數器,計數器值為:"+DISTRIBUTE_LOCK.getCount()+"讓當前請求來獲取分散式鎖...");
                    }
                }
            }
        });
    }
}

下面來測試一下,有空的話你可以寫一個類似我這種下單的模式去測試,如果時間緊寫個測試類模擬也無妨:

public class Test {
    public static void main(String[] args) {
        final ExecutorService threadpool = Executors.newCachedThreadPool();
        System.out.println("開始購買...");
        for (int i = 0; i <2 ; i++) {
            threadpool.execute(new Runnable() {
                public void run() {
                    System.out.println("我是執行緒:"+Thread.currentThread().getName()+"我開始搶購了...");
                     ZkLockUtil.getLock();
                    System.out.println(Thread.currentThread().getName()+":我正在瘋狂的剁手購買中...");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+":我買完了,有請下一位...");
                    try {
                        ZkLockUtil.addWatcher2Path("/zk-lock");
                        System.out.println("新增完畢...");
                        ZkLockUtil.release("/zk-lock/distribute-lock");
                        System.out.println("釋放完畢...");
                        Thread.sleep(1000);

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

測試結果可以看出,在其中一個執行緒購買時,另外一個執行緒被掛起等待...

或者用我這種2個服務一起下單模式的:

@RestController
public class SellController {
    @Autowired
    private SellRepository sellRepository;
    @RequestMapping("sell")
    public String sell(@RequestParam("num")Integer num){
        String msg = "下單成功!";
        try {
            Thread.sleep(3000);
            ZkLockUtil.getLock();
            Sell sell = sellRepository.getOne(1);
            int remain = sell.getStock()-num;
            sell.setStock(remain);
            if (remain >= 0){
                sellRepository.save(sell);
            }else{
                msg = "下單失敗,庫存不足...";
            }
            ZkLockUtil.addWatcher2Path("/zk-lock");
            ZkLockUtil.release("/zk-lock/distribute-lock");
        } catch (Exception e) {
            e.printStackTrace();
            msg = "下單異常...";
        }
        return msg;
    }

測試結果:

資料庫中電腦庫存由原來的10變為2,達到期望效果:

但這樣就大功告成了嗎? 其實我覺得沒有,原因是因為我在測試的時候發現一個問題,當我在瀏覽器中按住F5進行重新整理頁面(模擬高併發下的請求頻率),在一開始一切都是正常的,刷一陣子之後八爾哥就出來了:

找了一下原因,發現問題出在了這裡:

由於我不斷的重新整理頁面,就意味著不斷的去獲取鎖和釋放鎖,當鎖被釋放後計數器減1,會去喚醒執行緒去競爭鎖,然後這個時候來沒來的及喚醒,新的請求又進來了,此時新請求建立鎖成功了,被喚醒的執行緒又搶不到鎖了,但計數器仍處於0的狀態,它會繼續去建立鎖,此時又有新的請求不斷進來,不斷建立鎖...導致zk認為你是在不斷的進行重複操作,於是它就把連線給退出了,然後又有新請求進來了,又要重新建立連線:

新建立的連線又會像上面一樣在連綿不斷的請求中斷開,這樣頻繁的連線和斷開,重複數次之後,ZK直接關閉了連線,導致後臺無限報錯...

為了解決這個問題,我搜羅各大網站,最後沒有找到什麼可以參考的東西,我甚至開始懷疑是我寫的鎖有問題,但後來我在apache curator官網找到了解釋,其實curator已經幫我們封裝好了一套分散式鎖,可以直接拿來用的:

於是我直接呼叫了zk封裝好的這一套分散式鎖去做測試,普通情況下跟我自己寫的分散式鎖沒啥兩樣,效果一樣,然後我繼續用F5模擬高併發下的情況,結果跟我自己的鎖如出一轍...也是報同樣的錯,錯誤原因也一樣,最後閱讀了下原始碼,其實自己寫的跟apache寫的沒啥兩樣,原理都一樣,只是人家封裝的更方便別人使用一些...

    @RequestMapping("plus")
    public String sellPlus(@RequestParam("num")int num) throws Exception {
        CuratorFramework client = ZkClientUtil.build();
        client.start();
        String lockPath = "/plus_lock";
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
        String msg = "下單成功!";
        if ( lock.acquire(3, TimeUnit.SECONDS) )
        {
            try
            {
                Sell sell = sellRepository.getOne(1);
                int remain = sell.getStock()-num;
                if (remain >= 0){
                    sell.setStock(remain);
                    sellRepository.save(sell);
                }else {
                    msg = "下單失敗,庫存不足...";
                }
            }
            finally
            {
                lock.release();
            }
        }
        return msg;
    }

我總不能去懷疑apache 寫的鎖也有問題吧,那問題應該就出在了zk建立連線或者zk過濾連線的機制上,應該是zk以為那些頻繁關閉又連線的請求是被攻擊或者無效的請求,所以強制關閉這些連線,目前尚未去研究zk的連線機制,也不清楚研究了是否能解決該問題,所以基於zk的分散式鎖就講到這裡,同時在我心裡它已經不是做分散式鎖的首選了,我會考慮使用redis或者其它分散式鎖去解決,尤其是在高併發的情況下,感興趣的可以繼續關注本博,在redis系列教程中,我會講如何用redis實現分散式鎖.