1. 程式人生 > >ZooKeeper - 分散式鎖

ZooKeeper - 分散式鎖

 ZooKeeper實現分散式鎖的優點:ZK可以建立持久節點、臨時節點和順序節點。臨時節點在會話結束之後,自動被刪除。即使發生宕機,只要超出心跳時間,就會斷開會話,從而刪除臨時節點,不會造成死鎖的現象。


  1.  指定競爭的資源,在ZK下生成持久節點。
  2. 在持久節點下,生成若干臨時順序節點,嘗試獲取鎖。
  3. 判斷該節點是否是序號最小的節點,若是,則獲取鎖;若不是,則阻塞。
  4. 通過ZK的Watcher監聽上一個節點的事件,滿足要求則解除阻塞。重複3操作。
  5. 最後所有的節點都獲得了鎖,再斷開ZK的會話連線,刪除所有臨時節點。
package com.mzs.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;

public class DistributeLock implements Watcher, Lock {

    private static Logger logger = Logger.getLogger("com.mzs.lock.DistributeLock");
    // 根節點路徑
    private final String path = "/root";
    private ZooKeeper zooKeeper;
    // 會話超時時間
    private final static int SESSION_TIME_OUT = 50000;
    // 當前鎖
    private String currentLock;
    // 上一個鎖
    private String waitLock;
    // 競爭的資源
    private String lockName;
    private CountDownLatch countDownLatch;

    public DistributeLock(String url, String lockName) {
        this.lockName = lockName;
        try {
            // 建立zookeeper會話
            zooKeeper = new ZooKeeper(url, SESSION_TIME_OUT, this);
            // 根節點是否存在
            Stat stat = zooKeeper.exists(path, false);
            if (stat == null) {
                // 建立持久節點,並擁有所有權
                zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 加鎖(普通獲取或者重入獲取)
     */
    @Override
    public void lock() {
        // 成功獲取
        if (tryLock())
            logger.info("[" + Thread.currentThread().getName() + "] --- get --- [" + lockName + "]");
        else {
            try {
                // 等待獲取
                waitLock(waitLock, SESSION_TIME_OUT);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 等待鎖
     * @param waitTime 超時時間
     * @return 是否成功等待
     */
    private boolean waitLock(String waitLock, long waitTime) throws KeeperException, InterruptedException {
        Stat stat = zooKeeper.exists(path + "/" + waitLock, true);
        // 等待鎖中有節點與之對應
        if (stat != null) {
            countDownLatch = new CountDownLatch(1);
            try {
                logger.info("[" + Thread.currentThread().getName() + "] --- wait --- [" + path + "/" + waitLock + "]");
                // 等待時長為waitTime毫秒
                boolean bool = countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
                String curentThreadName = Thread.currentThread().getName();
                logger.info("[" + curentThreadName + "] --- " + (!bool ? "wait for timeout,stop to get the lock" : "got the lock in the valid time"));
                // 等待結束
                countDownLatch = null;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return true;
    }

    /**
     * 加鎖(優先考慮中斷)
     *
     * @throws InterruptedException 被中斷異常
     */
    @Override
    public void lockInterruptibly() throws InterruptedException {
        lock();
    }

    /**
     * 嘗試獲得鎖
     * @return 是否獲得鎖
     */
    @Override
    public boolean tryLock() {
        String separateStr = "/";
        String separateStr2 = "-";
        // 臨時順序節點的路徑
        String path_1 = path + separateStr + lockName + separateStr2;
        try {
            // 建立臨時順序節點
            currentLock = zooKeeper.create(path_1, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            logger.info("[" + currentLock + "] --- " + " was created");
            // 定義一個根節點下所有子節點組成的列表
            List<String> nodeChildren = zooKeeper.getChildren(path, false);
            // 定義一個儲存包含lockName的節點的列表
            List<String> nodeList = new ArrayList<>();
            // 從nodeChildren中取出所有包含lockName的所有節點,加入到nodeList中
            for (String nodeElem : nodeChildren) {
                String node = nodeElem.split(separateStr2)[0];
                if (node.equals(lockName)) {
                    nodeList.add(nodeElem);
                }
            }
            // 排序nodeList(從小到大)
            Collections.sort(nodeList);
            logger.info("[" + Thread.currentThread().getName() + "] --- corresponding the lock [" + currentLock + "]");
            // 若當前鎖對應的是最小節點,則認為取得了鎖
            if (currentLock.equals(path + separateStr + nodeList.get(0))) {
                return true;
            }
            // 通過當前鎖檢視它所對應的節點
            String currentNode = currentLock.substring(currentLock.lastIndexOf(separateStr) + 1);
            // 二分查詢法查詢上一個節點
            int index = Collections.binarySearch(nodeList, currentNode) - 1;
            // 將上一個節點標記一個等待鎖
            waitLock = nodeList.get(index);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 表示獲取鎖失敗
        return false;
    }

    /**
     * 超時時間內,嘗試獲得鎖
     *
     * @param time 超時時間
     * @param unit 時間單位
     * @return 是否成功獲得鎖
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) {
        try {
            // 已經成功獲取鎖
            if (tryLock())
                return true;
            // 等待獲取鎖
            return waitLock(waitLock, time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 釋放鎖
     */
    @Override
    public void unlock() {
        try {
            // 刪除當前鎖對應的節點
            zooKeeper.delete(currentLock, -1);
            logger.info("[" + Thread.currentThread().getName() + "] --- release --- [" + currentLock + "]");
            // 釋放當前鎖
            currentLock = null;
            zooKeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    /**
     * 監聽節點
     * @param event 節點事件
     */
    @Override
    public void process(WatchedEvent event) {
        if (countDownLatch != null) {
            countDownLatch.countDown();
        }
        /*if (Event.KeeperState.SyncConnected == event.getState()) {
            if (Event.EventType.None == event.getType() && event.getPath() == null) {
                logger.info("establish the session");
            } else if (Event.EventType.NodeCreated == event.getType()) {
                logger.info("node [" + event.getPath() + "] was created");
            } else if (Event.EventType.NodeDeleted == event.getType()) {
                logger.info("node [" + event.getPath() + "] was deleted");
            } else if (Event.EventType.NodeDataChanged == event.getType()) {
                logger.info("node [" + event.getPath() + "] data changed");
            } else if (Event.EventType.NodeChildrenChanged == event.getType()) {
                logger.info("node [" + event.getPath() + "] children node changed");
            }
        }*/
    }
}

package com.mzs.lock;

import java.util.logging.Logger;

public class TestDistributeLock {

    private static Logger logger = Logger.getLogger("com.mzs.lock.TestDistributeLock");

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    DistributeLock distributeLock = null;
                    try {
                        distributeLock = new DistributeLock("192.168.101.199:2181", "lock");
                        distributeLock.lock();
                        logger.info("[" + Thread.currentThread().getName() + "] --- running" );
                    } finally {
                        if (distributeLock != null)
                            distributeLock.unlock();
                    }
                }
            });
            thread.start();
        }
    }

}

程式碼選自 --- https://www.cnblogs.com/liuyang0/p/6800538.html