1. 程式人生 > >淺談分散式鎖--基於Zookeeper實現篇

淺談分散式鎖--基於Zookeeper實現篇

淺談分散式鎖--基於Zookeeper實現篇:

1、基於zookeeper臨時有序節點可以實現的分散式鎖。其實基於ZooKeeper,就是使用它的臨時有序節點來實現的分散式鎖。

來看下Zookeeper能不能解決前面提到的問題。

    鎖無法釋放:使用Zookeeper可以有效的解決鎖無法釋放的問題,因為在建立鎖的時候,客戶端會在ZK中建立一個臨時節點,一旦客戶端獲取到鎖之後突然掛掉(Session連線斷開),那麼這個臨時節點就會自動刪除掉。其他客戶端就可以再次獲得鎖。
    非阻塞鎖:使用Zookeeper可以實現阻塞的鎖,客戶端可以通過在ZK中建立順序節點,並且在節點上繫結監聽器,一旦節點有變化,Zookeeper會通知客戶端,客戶端可以檢查自己建立的節點是不是當前所有節點中序號最小的,如果是,那麼自己就獲取到鎖,便可以執行業務邏輯了。
    不可重入:

使用Zookeeper也可以有效的解決不可重入的問題,客戶端在建立節點的時候,把當前客戶端的主機資訊和執行緒資訊直接寫入到節點中,下次想要獲取鎖的時候和當前最小的節點中的資料比對一下就可以了。如果和自己的資訊一樣,那麼自己直接獲取到鎖,如果不一樣就再建立一個臨時的順序節點,參與排隊。
    單點問題:使用Zookeeper可以有效的解決單點問題,ZK是叢集部署的,只要叢集中有半數以上的機器存活,就可以對外提供服務。

2、實現原理:

原理就是:當某客戶端要進行邏輯的加鎖時,就在zookeeper上的某個指定節點的目錄下,去生成一個唯一的臨時有序節點, 然後判斷自己是否是這些有序節點中序號最小的一個,如果是,則算是獲取了鎖。如果不是,則說明沒有獲取到鎖,那麼就需要在序列中找到比自己小的那個節點,並對其呼叫exist()方法,對其註冊事件監聽,當監聽到這個節點被刪除了,那就再去判斷一次自己當初建立的節點是否變成了序列中最小的。如果是,則獲取鎖,如果不是,則重複上述步驟。


如圖,locker是一個持久節點,node_1/node_2/…/node_n 就是上面說的臨時節點,由客戶端client去建立的。
client_1/client_2/…/clien_n 都是想去獲取鎖的客戶端。以client_1為例,它想去獲取分散式鎖,則需要跑到locker下面去建立臨時節點(假如是node_1)建立完畢後,
看一下自己的節點序號是否是locker下面最小的,如果是,則獲取了鎖。如果不是,則去找到比自己小的那個節點(假如是node_2),找到後,就監聽node_2,直到node_2被刪除,
那麼就開始再次判斷自己的node_1是不是序列中最小的,如果是,則獲取鎖,如果還不是,則繼續找一下一個節點。

3、優點
    鎖安全性高,zk可持久化
4、缺點
    效能開銷比較高。因為其需要動態產生、銷燬瞬時節點來實現鎖功能。
5、實現
    可以直接採用zookeeper第三方庫curator即可方便地實現分散式鎖

6、zookeeper還有幾個特質,讓它非常適合作為分散式鎖服務。

    zookeeper支援watcher機制,這樣實現阻塞鎖,可以watch鎖資料,等到資料被刪除,zookeeper會通知客戶端去重新競爭鎖。
    zookeeper的資料可以支援臨時節點的概念,即客戶端寫入的資料是臨時資料,在客戶端宕機後,臨時資料會被刪除,這樣就實現了鎖的異常釋放。使用這樣的方式,就不需要給鎖增加超時自動釋放的特性了。

    zookeeper實現鎖的方式是客戶端一起競爭寫某條資料,比如/path/lock,只有第一個客戶端能寫入成功,其他的客戶端都會寫入失敗。
    寫入成功的客戶端就獲得了鎖,寫入失敗的客戶端,註冊watch事件,等待鎖的釋放,從而繼續競爭該鎖。


7、節點介紹:

       1、PERSISTENT-持久化目錄節點 
  客戶端與Zookeeper斷開連線後,該節點依舊存在 
  2、PERSISTENT_SEQUENTIAL-持久化順序編號目錄節點 
  客戶端與Zookeeper斷開連線後,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號 
  3、EPHEMERAL-臨時目錄節點 
  客戶端與Zookeeper斷開連線後,該節點被刪除 
  4、EPHEMERAL_SEQUENTIAL-臨時順序編號目錄節點 
  客戶端與Zookeeper斷開連線後,該節點被刪除,只是Zookeeper給該節點名稱進行順序編號 

    監視器(watcher):

    當建立一個節點時,可以註冊一個該節點的監視器,當節點狀態發生改變時,watch被觸發時,ZooKeeper將會向客戶端傳送且僅傳送一條通知,因為watch只能被觸發一次。


8、如何利用這些特性來實現分散式鎖:
    1. 建立一個鎖目錄lock。
    2. 希望獲得鎖的執行緒A就在lock目錄下,建立臨時順序節點。
    3. 獲取鎖目錄下所有的子節點,然後獲取比自己小的兄弟節點,如果不存在,則說明當前執行緒順序號最小,獲得鎖。
    4. 執行緒B獲取所有節點,判斷自己不是最小節點,設定監聽(watcher)比自己次小的節點(只關注比自己次小的節點是為了防止發生“羊群效應”)。
    5. 執行緒A處理完,刪除自己的節點,執行緒B監聽到變更事件,判斷自己是最小的節點,獲得鎖。

    

9、例項實現:


public class ZKLock implements Watcher {

    private int threadId;
    private ZooKeeper zk = null;
    private String selfPath;
    private String waitPath;
    private String CURRENT_THREAD;
    private static final String GROUP_PATH = "/disLocks";
    private static final String SUB_PATH = "/disLocks/sub";
    private static final String CONNECTION_STRING = "192.168.10.41:2181";

    private static final int THREAD_NUM = 10;
    // 確保連線zk成功;
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    // 確保所有執行緒執行結束;
    private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);

    private static final Logger LOG = LoggerFactory.getLogger(ZKLock.class);

    public ZKLock(int id) {
        this.threadId = id;
        CURRENT_THREAD = "【第" + threadId + "個執行緒】";
    }

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_NUM; i++) {
            final int threadId = i + 1;
            new Thread() {
                @Override
                public void run() {
                    try {
                        ZKLock dc = new ZKLock(threadId);
                        dc.createZKConnection(CONNECTION_STRING, 1000);
                        // GROUP_PATH不存在的話,由一個執行緒建立即可;

                        synchronized (threadSemaphore) {
                            dc.createPath(GROUP_PATH, "該節點由執行緒" + threadId + "建立", true);
                        }
                        dc.getLock();
                    } catch (Exception e) {
                        LOG.error("【第" + threadId + "個執行緒】 丟擲的異常:");
                        e.printStackTrace();
                    }
                }
            }.start();
        }
        try {
            threadSemaphore.await();
            LOG.info("所有執行緒執行結束!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 建立節點
     * 
     * @param path
     *            節點path
     * @param data
     *            初始資料內容
     * @return
     */
    public boolean createPath(String path, String data, boolean needWatch)
            throws KeeperException, InterruptedException {
        if (zk.exists(path, needWatch) == null) {
            LOG.info(CURRENT_THREAD + "節點建立成功, Path: "
                    + this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
                    + ", content: " + data);
        }
        return true;
    }

    /**
     * 建立ZK連線
     */
    public void createZKConnection(String connectString, int sessionTimeout) throws IOException, InterruptedException {
        zk = new ZooKeeper(connectString, sessionTimeout, this);
        connectedSemaphore.await();
    }

    /**
     * 獲取鎖
     */
    private void getLock() throws KeeperException, InterruptedException {
        try {
            selfPath = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (KeeperException e) {
            e.printStackTrace();
        }

        LOG.info(CURRENT_THREAD + "建立鎖路徑:" + selfPath);

        if (checkMinPath()) {
            // 如果是最小節點獲得鎖
            getLockSuccess();
        }
    }

    /**
     * 獲取鎖成功
     */
    public void getLockSuccess() throws KeeperException, InterruptedException {
        if (zk.exists(this.selfPath, false) == null) {
            LOG.error(CURRENT_THREAD + "本節點已不在了...");
            return;
        }
        LOG.info(CURRENT_THREAD + "獲取鎖成功!");

        // 處理業務邏輯
        Thread.sleep(2000);
        unlock();
    }

    /**
     * 釋放鎖
     */
    public void unlock() {

        LOG.info(CURRENT_THREAD + "刪除本節點:" + selfPath);

        try {
            zk.delete(this.selfPath, -1);

            releaseConnection();
            threadSemaphore.countDown();

            System.out.println(CURRENT_THREAD + "節點的鎖已經釋放了!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 關閉ZK連線
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
            }
        }
        LOG.info(CURRENT_THREAD + "釋放連線");
    }

    /**
     * 檢查自己是不是最小的節點
     * 
     * @return
     */
    public boolean checkMinPath() throws KeeperException, InterruptedException {
        List<String> subNodes = zk.getChildren(GROUP_PATH, false);
        Collections.sort(subNodes);
        int index = subNodes.indexOf(selfPath.substring(GROUP_PATH.length() + 1));
        switch (index) {
        case -1: {
            LOG.error(CURRENT_THREAD + "節點已不在了..." + selfPath);
            return false;
        }
        case 0: {
            LOG.info(CURRENT_THREAD + "節點可以獲得鎖了" + selfPath);
            return true;
        }
        default: {
            this.waitPath = GROUP_PATH + "/" + subNodes.get(index - 1);
            LOG.info(CURRENT_THREAD + "獲取子節點中,排在我前面的" + waitPath);
            try {
                zk.getData(waitPath, true, new Stat());
                return false;
            } catch (KeeperException e) {
                if (zk.exists(waitPath, false) == null) {
                    LOG.info(CURRENT_THREAD + "子節點中,排在前面的" + waitPath + "已丟失");
                    return checkMinPath();
                } else {
                    throw e;
                }
            }
        }
        }
    }

    @Override
    public void process(WatchedEvent event) {
        if (event == null) {
            return;
        }

        Event.KeeperState keeperState = event.getState();
        Event.EventType eventType = event.getType();

        if (Event.KeeperState.SyncConnected == keeperState) {
            if (Event.EventType.None == eventType) {
                LOG.info(CURRENT_THREAD + "成功連線ZK伺服器");
                connectedSemaphore.countDown();
            } else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                LOG.info(CURRENT_THREAD + "排前面的節點丟失,判斷該節點是否可以獲得鎖?");
                try {
                    if (checkMinPath()) {
                        getLockSuccess();
                    }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } else if (Event.KeeperState.Disconnected == keeperState) {
            LOG.info(CURRENT_THREAD + "與ZK伺服器斷開連線");
        } else if (Event.KeeperState.AuthFailed == keeperState) {
            LOG.info(CURRENT_THREAD + "許可權檢查失敗");
        } else if (Event.KeeperState.Expired == keeperState) {
            LOG.info(CURRENT_THREAD + "會話失效");
        }
    }
}


10、總結:(建議採用的方式)

    使用Zookeeper實現分散式鎖的優點

    有效的解決單點問題,不可重入問題,非阻塞問題以及鎖無法釋放的問題。實現起來較為簡單。

    使用Zookeeper實現分散式鎖的缺點

    效能上不如使用快取實現分散式鎖。 需要對ZK的原理有所瞭解。

 

每天努力一點,每天都在進步。