1. 程式人生 > >zookeeper — 實現分布式鎖

zookeeper — 實現分布式鎖

lean sta 會話期 競爭 all zookeeper scribe handle 獲取鎖

一.前言

在之前的文章中介紹過分布式鎖的特點和利用Redis實現簡單的分布式鎖。但是分布式鎖的實現還有很多其他方式,但是萬變不離其宗,始終遵循一個特點:同一時刻只能有一個操作獲取。這篇文章主要介紹如何基於zookeeper實現分布式鎖。

  • zookeeper能夠作為分布式鎖實現的基礎
  • 算法流程
  • 實現

關於分布式鎖的相關特性,這裏不再贅述,請參考分布式鎖。


二.zookeeper能夠作為分布式鎖實現的基礎

這裏回顧下分布式鎖的特點:

  • 每次只能一個占用鎖;
  • 可以重復進入鎖;
  • 只有占用者才可以解鎖;
  • 獲取鎖和釋放鎖都需要原子
  • 不能產生死鎖
  • 盡量滿足性能

zookeeper中有一種臨時順序節點,它具有以下特征:

  • 時效性,當會話結束,節點將自動被刪除
  • 順序性,當多個應用向其註冊順序節點時,每個順序號將只能被一個應用獲取

利用以上的特點可以滿足分布式鎖實現的基本要求:

  1. 因為順序性,可以讓最小順序號的應用獲取到鎖,從而滿足分布式鎖的每次只能一個占用鎖,因為只有它一個獲取到,所以可以實現重復進入,只要設置標識即可。鎖的釋放,即刪除應用在zookeeper上註冊的節點,因為每個節點只被自己註冊擁有,所以只有自己才能刪除,這樣就滿足只有占用者才可以解鎖

  2. zookeeper的序號分配是原子的,分配後即不會再改變,讓最小序號者獲取鎖,所以獲取鎖是原子的

  3. 因為註冊的是臨時節點,在會話期間內有效,所以不會產生死鎖

  4. zookeeper註冊節點的性能能滿足幾千,而且支持集群,能夠滿足大部分情況下的性能

三.算法流程

1.獲取鎖

需要獲取分布式鎖的應用都向zookeeper的/lock/{resouce}目錄下註冊sequence-前綴的節點,序號最小者獲取到操作資源的權限:

技術分享圖片

Note:
這裏的resource需要依據競爭的具體資源確定,如競爭賬戶則可以使用賬戶號作為resource。

從圖中可以看出,clientA的順序號最小,由它獲取到鎖,操作資源。

算法步驟

  1. client判斷/lock目錄是否存在,如果不存在則向其註冊/lock的持久節點
  2. client判斷/lock目錄下是否存在競爭的資源resouce目錄,如果不存在則向其註冊/lock/resource的持久節點
  3. client向/lock/resource目錄下註冊/lock/resource/sequence-前綴的臨時順序節點,並得到順序號
  4. client獲取/lock/resource目錄下的所有臨時順序子節點
  5. client判斷臨時子節點序號中是否存在比自身的序號小的節點。如果不存在,則獲取到鎖;如果存在,則對象該臨時節點做watch監控
  6. 如果收到監控的臨時節點被刪除的通知,則再重復4、5步驟,直到獲取到鎖

流程圖

技術分享圖片

2.釋放鎖

因為最小的節點只被獲取到鎖的client持有,所以該鎖不可能被其他client釋放。同時釋放鎖只需要將臨時順序節點刪除,也是原子性操作。


三.實現

/**
 * 基於Zookeeper實現分布式鎖
 *
 * @author huaijin
 */
public class DistributedLockBaseZookeeper implements DistributedLock {

    private static final Logger log = LoggerFactory.getLogger(DistributedLockBaseZookeeper.class);

    /**
     * 利用空串作為各個節點存儲的數據
     */
    private static final String EMPTY_DATA = "";

    /**
     * 分布式鎖的根目錄
     */
    private static final String LOCK_ROOT = "/lock";

    /**
     * zookeeper目錄分隔符
     */
    private static final String PATH_SEPARATOR = "/";

    /**
     * 臨時順序節點前綴
     */
    private static final String LOCK_NODE_PREFIX = "sequence-";

    /**
     * 利用Lock和Condition實現等待通知
     */
    private Lock waitNotifierLock = new ReentrantLock();
    private Condition waitNotifier = waitNotifierLock.newCondition();

    /**
     * 操作zookeeper的client
     */
    private ZkClient zkClient;

    /**
     * 分布式資源的路徑
     */
    private String resourcePath;

    /**
     * 鎖節點完整前綴
     */
    private String lockNodePrefix;

    /**
     * 當前註冊的臨時順序節點路徑
     */
    private String currentLockNodePath;

    public DistributedLockBaseZookeeper(String resource, ZkClient zkClient) {
        Objects.requireNonNull(zkClient, "zkClient must not be null!");
        if (resource == null || resource.isEmpty()) {
            throw new IllegalArgumentException("resource must not be null!");
        }
        this.zkClient = zkClient;
        this.resourcePath = LOCK_ROOT + PATH_SEPARATOR + resource;
        this.lockNodePrefix = resourcePath + PATH_SEPARATOR + LOCK_NODE_PREFIX;

        // 創建分布式鎖根目錄
        if (!this.zkClient.exists(LOCK_ROOT)) {
            try {
                this.zkClient.create(LOCK_ROOT, EMPTY_DATA, CreateMode.PERSISTENT);
            } catch (ZkNodeExistsException e) {
                // ignore, logging
                log.warn("The root path for lock already exists.");
            }
        }

        // 創建資源目錄
        if (!this.zkClient.exists(resourcePath)) {
            try {
                this.zkClient.create(resourcePath, EMPTY_DATA, CreateMode.PERSISTENT);
            } catch (ZkNodeExistsException e) {
                // ignore, logging
                log.warn("The resource path for [" + resourcePath + "] already exists.");
            }
        }
    }


    @Override
    public void lock() throws DistributedLockException {
        if (!acquireLock()) {
            // 如果獲取鎖不成功,則等待
            waitNotifierLock.lock();
            try {
                waitNotifier.await();
            } catch (Exception e) {
                throw new DistributedLockException("Interrupt when waiting notification.");
            } finally {
                waitNotifierLock.unlock();
            }
        }
    }

    @Override
    public void unlock() {
        // 刪除自身節點,釋放鎖
        zkClient.delete(currentLockNodePath);
    }

    private boolean acquireLock() throws DistributedLockException {
        // 如果當前未註冊臨時順序節點,則註冊
        if (this.currentLockNodePath == null) {
            this.currentLockNodePath = zkClient.create(lockNodePrefix, EMPTY_DATA, CreateMode.EPHEMERAL_SEQUENTIAL);
        }
        // 獲取順序號
        long lockNodeSeq = fetchSeqFromNodePath(currentLockNodePath);
        // 獲取所有子節點
        List<String> childNodePaths = zkClient.getChildren(resourcePath);
        if (childNodePaths == null || childNodePaths.isEmpty()) {
            throw new DistributedLockException("Not exists child nodes.");
        }
        // 從所有子節點中獲取最小子節點的順序號
        long minSeq = 1000000L;
        int minIndex = -1;
        for (int i = 0; i < childNodePaths.size(); i++) {
            long nodeSeq = fetchSeqFromNodePath(resourcePath + childNodePaths.get(i));
            if (nodeSeq < minSeq) {
                minSeq = nodeSeq;
                minIndex = i;
            }
        }
        // 比較自身順序號與最小序號
        if (lockNodeSeq > minSeq) {
            // 如果存在更小序號,則監控最小序號的子節點
            String minLockNodePath = childNodePaths.get(minIndex);
            zkClient.subscribeDataChanges(resourcePath + PATH_SEPARATOR + minLockNodePath,
                    new ListenerForLockRelease());
            return false;
        }
        // 成功獲取鎖,返回
        return true;
    }

    private long fetchSeqFromNodePath(String nodePath) {
        String seq = nodePath.substring(lockNodePrefix.length());
        return Long.valueOf(seq);
    }

    private class ListenerForLockRelease implements IZkDataListener {

        @Override
        public void handleDataChange(String dataPath, Object data) throws Exception {
        }

        @Override
        public void handleDataDeleted(String dataPath) throws Exception {
            // 如果成功獲取鎖,則通知,讓主線程返回
            if (acquireLock()) {
                waitNotifierLock.lock();
                try {
                    waitNotifier.signal();
                } finally {
                    waitNotifierLock.unlock();
                }
            }
        }
    }
}

zookeeper — 實現分布式鎖