1. 程式人生 > >CSDN站內最全的zookeeper分散式鎖的講解

CSDN站內最全的zookeeper分散式鎖的講解

1 場景描述
在分散式應用, 往往存在多個程序提供同一服務. 這些程序有可能在相同的機器上, 也有可能分佈在不同的機器上. 如果這些程序共享了一些資源, 可能就需要分散式鎖來鎖定對這些資源的訪問。

2 思路
程序需要訪問共享資料時, 就在"/locks"節點下建立一個sequence型別的子節點, 稱為thisPath. 當thisPath在所有子節點中最小時, 說明該程序獲得了鎖. 程序獲得鎖之後, 就可以訪問共享資源了. 訪問完成後, 需要將thisPath刪除. 鎖由新的最小的子節點獲得.

有了清晰的思路之後, 還需要補充一些細節. 程序如何知道thisPath是所有子節點中最小的呢? 可以在建立的時候, 通過getChildren方法獲取子節點列表, 然後在列表中找到排名比thisPath前1位的節點, 稱為waitPath, 然後在waitPath上註冊監聽, 當waitPath被刪除後, 程序獲得通知, 此時說明該程序獲得了鎖.

3 演算法
lock操作過程:

首先為一個lock場景,在zookeeper中指定對應的一個根節點,用於記錄資源競爭的內容;

每個lock建立後,會lazy在zookeeper中建立一個node節點,表明對應的資源競爭標識。 (小技巧:node節點為EPHEMERAL_SEQUENTIAL,自增長的臨時節點);

進行lock操作時,獲取對應lock根節點下的所有子節點,也即處於競爭中的資源標識;

按照Fair(公平)競爭的原則,按照對應的自增內容做排序,取出編號最小的一個節點做為lock的owner,判斷自己的節點id是否就為owner id,如果是則返回,lock成功。

如果自己非owner id,按照排序的結果找到序號比自己前一位的id,關注它鎖釋放的操作(也就是exist watcher),形成一個鏈式的觸發過程;

unlock操作過程:

將自己id對應的節點刪除即可,對應的下一個排隊的節點就可以收到Watcher事件,從而被喚醒得到鎖後退出;

其中的幾個關鍵點:

node節點選擇為EPHEMERAL_SEQUENTIAL很重要。

自增長的特性,可以方便構建一個基於Fair特性的鎖,前一個節點喚醒後一個節點,形成一個鏈式的觸發過程。可以有效的避免"驚群效應"(一個鎖釋放,所有等待的執行緒都被喚醒),有針對性的喚醒,提升效能。

選擇一個EPHEMERAL臨時節點的特性。因為和zookeeper互動是一個網路操作,不可控因素過多,比如網路斷了,上一個節點釋放鎖的操作會失敗。臨時節點是和對應的session掛接的,session一旦超時或者異常退出其節點就會消失,類似於ReentrantLock中等待佇列Thread的被中斷處理。

獲取lock操作是一個阻塞的操作,而對應的Watcher是一個非同步事件,所以需要使用互斥訊號共享鎖BooleanMutex進行通知,可以比較方便的解決鎖重入的問題。(鎖重入可以理解為多次讀操作,鎖釋放為寫搶佔操作)

注意:

使用EPHEMERAL會引出一個風險:在非正常情況下,網路延遲比較大會出現session timeout,zookeeper就會認為該client已關閉,從而銷燬其id標示,競爭資源的下一個id就可以獲取鎖。這時可能會有兩個process同時拿到鎖在跑任務,所以設定好session timeout很重要。

同樣使用PERSISTENT同樣會存在一個死鎖的風險,程序異常退出後,對應的競爭資源id一直沒有刪除,下一個id一直無法獲取到鎖物件。

4 實現

  1. DistributedLock.java原始碼:分散式鎖
import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
 
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
 
/**
 * Zookeeper 分散式鎖
 */
public class DistributedLock {
 
    private static final int SESSION_TIMEOUT = 10000;
 
    private static final int DEFAULT_TIMEOUT_PERIOD = 10000;
 
    private static final String CONNECTION_STRING = "127.0.0.1:2180,127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
 
    private static final byte[] data = {0x12, 0x34};
 
    private ZooKeeper zookeeper;
 
    private String root;
 
    private String id;
 
    private LockNode idName;
 
    private String ownerId;
 
    private String lastChildId;
 
    private Throwable other = null;
 
    private KeeperException exception = null;
 
    private InterruptedException interrupt = null;
 
    public DistributedLock(String root) {
        try {
            this.zookeeper = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, null);
            this.root = root;
            ensureExists(root);
        } catch (IOException e) {
            e.printStackTrace();
            other = e;
        }
    }
 
    /**
     * 嘗試獲取鎖操作,阻塞式可被中斷
     */
    public void lock() throws Exception {
        // 可能初始化的時候就失敗了
        if (exception != null) {
            throw exception;
        }
 
        if (interrupt != null) {
            throw interrupt;
        }
 
        if (other != null) {
            throw new Exception("", other);
        }
 
        if (isOwner()) {// 鎖重入
            return;
        }
 
        BooleanMutex mutex = new BooleanMutex();
        acquireLock(mutex);
        // 避免zookeeper重啟後導致watcher丟失,會出現死鎖使用了超時進行重試
        try {
//            mutex.lockTimeOut(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值為true
            mutex.lock();
        } catch (Exception e) {
            e.printStackTrace();
            if (!mutex.state()) {
                lock();
            }
        }
 
        if (exception != null) {
            throw exception;
        }
 
        if (interrupt != null) {
            throw interrupt;
        }
 
        if (other != null) {
            throw new Exception("", other);
        }
    }
 
    /**
     * 嘗試獲取鎖物件, 不會阻塞
     *
     * @throws InterruptedException
     * @throws KeeperException
     */
    public boolean tryLock() throws Exception {
        // 可能初始化的時候就失敗了
        if (exception != null) {
            throw exception;
        }
 
        if (isOwner()) { // 鎖重入
            return true;
        }
 
        acquireLock(null);
 
        if (exception != null) {
            throw exception;
        }
 
        if (interrupt != null) {
            Thread.currentThread().interrupt();
        }
 
        if (other != null) {
            throw new Exception("", other);
        }
 
        return isOwner();
    }
 
    /**
     * 釋放鎖物件
     */
    public void unlock() throws KeeperException {
        if (id != null) {
            try {
                zookeeper.delete(root + "/" + id, -1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (KeeperException.NoNodeException e) {
                // do nothing
            } finally {
                id = null;
            }
        } else {
            //do nothing
        }
    }
 
    /**
     * 判斷某path節點是否存在,不存在就建立
     * @param path
     */
    private void ensureExists(final String path) {
        try {
            Stat stat = zookeeper.exists(path, false);
            if (stat != null) {
                return;
            }
            zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            exception = e;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            interrupt = e;
        }
    }
 
    /**
     * 返回鎖物件對應的path
     */
    public String getRoot() {
        return root;
    }
 
    /**
     * 判斷當前是不是鎖的owner
     */
    public boolean isOwner() {
        return id != null && ownerId != null && id.equals(ownerId);
    }
 
    /**
     * 返回當前的節點id
     */
    public String getId() {
        return this.id;
    }

===================== helper method =============================

    /**
     * 執行lock操作,允許傳遞watch變數控制是否需要阻塞lock操作
     */
    private Boolean acquireLock(final BooleanMutex mutex) {
        try {
            do {
                if (id == null) { // 構建當前lock的唯一標識
                    long sessionId = zookeeper.getSessionId();
                    String prefix = "x-" + sessionId + "-";
                    // 如果第一次,則建立一個節點
                    String path = zookeeper.create(root + "/" + prefix, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    int index = path.lastIndexOf("/");
                    id = StringUtils.substring(path, index + 1);
                    idName = new LockNode(id);
                }
 
                if (id != null) {
                    List<String> names = zookeeper.getChildren(root, false);
                    if (names.isEmpty()) {
                        id = null; // 異常情況,重新建立一個
                    } else {
                        // 對節點進行排序
                        SortedSet<LockNode> sortedNames = new TreeSet<>();
                        for (String name : names) {
                            sortedNames.add(new LockNode(name));
                        }
 
                        if (!sortedNames.contains(idName)) {
                            id = null;// 清空為null,重新建立一個
                            continue;
                        }
 
                        // 將第一個節點做為ownerId
                        ownerId = sortedNames.first().getName();
                        if (mutex != null && isOwner()) {
                            mutex.unlock();// 直接更新狀態,返回
                            return true;
                        } else if (mutex == null) {
                            return isOwner();
                        }
 
                        SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
                        if (!lessThanMe.isEmpty()) {
                            // 關注一下排隊在自己之前的最近的一個節點
                            LockNode lastChildName = lessThanMe.last();
                            lastChildId = lastChildName.getName();
                            // 非同步watcher處理
                            Stat stat = zookeeper.exists(root + "/" + lastChildId, new Watcher() {
                                public void process(WatchedEvent event) {
                                    acquireLock(mutex);
                                }
                            });
 
                            if (stat == null) {
                                acquireLock(mutex);// 如果節點不存在,需要自己重新觸發一下,watcher不會被掛上去
                            }
                        } else {
                            if (isOwner()) {
                                mutex.unlock();
                            } else {
                                id = null;// 可能自己的節點已超時掛了,所以id和ownerId不相同
                            }
                        }
                    }
                }
            } while (id == null);
        } catch (KeeperException e) {
            exception = e;
            if (mutex != null) {
                mutex.unlock();
            }
        } catch (InterruptedException e) {
            interrupt = e;
            if (mutex != null) {
                mutex.unlock();
            }
        } catch (Throwable e) {
            other = e;
            if (mutex != null) {
                mutex.unlock();
            }
        }
 
        if (isOwner() && mutex != null) {
            mutex.unlock();
        }
        return Boolean.FALSE;
    }
}
  1. BooleanMutex.java原始碼:互斥訊號共享鎖
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
/**
 * 互斥訊號共享鎖
 */
public class BooleanMutex {
 
    private Sync sync;
 
    public BooleanMutex() {
        sync = new Sync();
        set(false);
    }
 
    /**
     * 阻塞等待Boolean為true
     * @throws InterruptedException
     */
    public void lock() throws InterruptedException {
        sync.innerLock();
    }
 
    /**
     * 阻塞等待Boolean為true,允許設定超時時間
     * @param timeout
     * @param unit
     * @throws InterruptedException
     * @throws TimeoutException
     */
    public void lockTimeOut(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        sync.innerLock(unit.toNanos(timeout));
    }
 
    public void unlock(){
        set(true);
    }
 
    /**
     * 重新設定對應的Boolean mutex
     * @param mutex
     */
    public void set(Boolean mutex) {
        if (mutex) {
            sync.innerSetTrue();
        } else {
            sync.innerSetFalse();
        }
    }
 
    public boolean state() {
        return sync.innerState();
    }
 
    /**
     * 互斥訊號共享鎖
     */
    private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7828117401763700385L;
 
        /**
         * 狀態為1,則喚醒被阻塞在狀態為FALSE的所有執行緒
         */
        private static final int TRUE = 1;
        /**
         * 狀態為0,則當前執行緒阻塞,等待被喚醒
         */
        private static final int FALSE = 0;
 
        /**
         * 返回值大於0,則執行;返回值小於0,則阻塞
         */
        protected int tryAcquireShared(int arg) {
            return getState() == 1 ? 1 : -1;
        }
 
        /**
         * 實現AQS的介面,釋放共享鎖的判斷
         */
        protected boolean tryReleaseShared(int ignore) {
            // 始終返回true,代表可以release
            return true;
        }
 
        private boolean innerState() {
            return getState() == 1;
        }
 
        private void innerLock() throws InterruptedException {
            acquireSharedInterruptibly(0);
        }
 
        private void innerLock(long nanosTimeout) throws InterruptedException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
        }
 
        private void innerSetTrue() {
            for (;;) {
                int s = getState();
                if (s == TRUE) {
                    return; // 直接退出
                }
                if (compareAndSetState(s, TRUE)) {// cas更新狀態,避免併發更新true操作
                    releaseShared(0);// 釋放一下鎖物件,喚醒一下阻塞的Thread
                }
            }
        }
 
        private void innerSetFalse() {
            for (;;) {
                int s = getState();
                if (s == FALSE) {
                    return; //直接退出
                }
                if (compareAndSetState(s, FALSE)) {//cas更新狀態,避免併發更新false操作
                    setState(FALSE);
                }
            }
        }
    }
}
  1. 測試類:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import org.apache.zookeeper.KeeperException;
 
/**
 * 分散式鎖測試
 * @version 1.0
 */
public class DistributedLockTest {
 
    public static void main(String [] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        final int count = 50;
        final CountDownLatch latch = new CountDownLatch(count);
        for (int i = 0; i < count; i++) {
            final DistributedLock node = new DistributedLock("/locks");
            executor.submit(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(1000);
//                        node.tryLock(); // 無阻塞獲取鎖
                        node.lock(); // 阻塞獲取鎖
                        Thread.sleep(100);
 
                        System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                        try {
                            node.unlock();
                        } catch (KeeperException e) {
                            e.printStackTrace();
                        }
                    }
 
                }
            });
        }
 
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        executor.shutdown();
    }
}

控制檯輸出:

id: x-239027745716109354-0000000248 is leader: true
id: x-22854963329433645-0000000249 is leader: true
id: x-22854963329433646-0000000250 is leader: true
id: x-166970151413415997-0000000251 is leader: true
id: x-166970151413415998-0000000252 is leader: true
id: x-166970151413415999-0000000253 is leader: true
id: x-166970151413416000-0000000254 is leader: true
id: x-166970151413416001-0000000255 is leader: true
id: x-166970151413416002-0000000256 is leader: true
id: x-22854963329433647-0000000257 is leader: true
id: x-239027745716109355-0000000258 is leader: true
id: x-166970151413416003-0000000259 is leader: true
id: x-94912557367427124-0000000260 is leader: true
id: x-22854963329433648-0000000261 is leader: true
id: x-239027745716109356-0000000262 is leader: true
id: x-239027745716109357-0000000263 is leader: true
id: x-166970151413416004-0000000264 is leader: true
id: x-239027745716109358-0000000265 is leader: true
id: x-239027745716109359-0000000266 is leader: true
id: x-22854963329433649-0000000267 is leader: true
id: x-22854963329433650-0000000268 is leader: true
id: x-94912557367427125-0000000269 is leader: true
id: x-22854963329433651-0000000270 is leader: true
id: x-94912557367427126-0000000271 is leader: true
id: x-239027745716109360-0000000272 is leader: true
id: x-94912557367427127-0000000273 is leader: true
id: x-94912557367427128-0000000274 is leader: true
id: x-166970151413416005-0000000275 is leader: true
id: x-94912557367427129-0000000276 is leader: true
id: x-166970151413416006-0000000277 is leader: true
id: x-94912557367427130-0000000278 is leader: true
id: x-94912557367427131-0000000279 is leader: true
id: x-239027745716109361-0000000280 is leader: true
id: x-239027745716109362-0000000281 is leader: true
id: x-166970151413416007-0000000282 is leader: true
id: x-94912557367427132-0000000283 is leader: true
id: x-22854963329433652-0000000284 is leader: true
id: x-166970151413416008-0000000285 is leader: true
id: x-239027745716109363-0000000286 is leader: true
id: x-239027745716109364-0000000287 is leader: true
id: x-166970151413416009-0000000288 is leader: true
id: x-166970151413416010-0000000289 is leader: true
id: x-239027745716109365-0000000290 is leader: true
id: x-94912557367427133-0000000291 is leader: true
id: x-239027745716109366-0000000292 is leader: true
id: x-94912557367427134-0000000293 is leader: true
id: x-22854963329433653-0000000294 is leader: true
id: x-94912557367427135-0000000295 is leader: true
id: x-239027745716109367-0000000296 is leader: true
id: x-239027745716109368-0000000297 is leader: true

5 升級版
實現了一個分散式lock後,可以解決多程序之間的同步問題,但設計多執行緒+多程序的lock控制需求,單jvm中每個執行緒都和zookeeper進行網路互動成本就有點高了,所以基於DistributedLock,實現了一個分散式二層鎖。

大致原理就是ReentrantLock 和 DistributedLock的一個結合:

單jvm的多執行緒競爭時,首先需要先拿到第一層的ReentrantLock的鎖;
拿到鎖之後這個執行緒再去和其他JVM的執行緒競爭鎖,最後拿到之後鎖之後就開始處理任務;
鎖的釋放過程是一個反方向的操作,先釋放DistributedLock,再釋放ReentrantLock。 可以思考一下,如果先釋放ReentrantLock,假如這個JVM ReentrantLock競爭度比較高,一直其他JVM的鎖競爭容易被餓死。

  1. DistributedReentrantLock.java原始碼:多程序+多執行緒分散式鎖
import java.text.MessageFormat;
import java.util.concurrent.locks.ReentrantLock;
 
import org.apache.zookeeper.KeeperException;
 
/**
 * 多程序+多執行緒分散式鎖
 */
public class DistributedReentrantLock extends DistributedLock {
 
    private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]";
    private ReentrantLock reentrantLock = new ReentrantLock();
 
    public DistributedReentrantLock(String root) {
        super(root);
    }
 
    public void lock() throws Exception {
        reentrantLock.lock();//多執行緒競爭時,先拿到第一層鎖
        super.lock();
    }
 
    public boolean tryLock() throws Exception {
        //多執行緒競爭時,先拿到第一層鎖
        return reentrantLock.tryLock() && super.tryLock();
    }
 
    public void unlock() throws KeeperException {
        super.unlock();
        reentrantLock.unlock();//多執行緒競爭時,釋放最外層鎖
    }
 
    @Override
    public String getId() {
        return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
    }
 
    @Override
    public boolean isOwner() {
        return reentrantLock.isHeldByCurrentThread() && super.isOwner();
    }
}
  1. 測試程式碼:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
import org.apache.zookeeper.KeeperException;
 
/**
 * @version 1.0
 */
public class DistributedReentrantLockTest {
 
    public static void main(String [] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        final int count = 50;
        final CountDownLatch latch = new CountDownLatch(count);
 
        final DistributedReentrantLock lock = new DistributedReentrantLock("/locks"); //單個鎖
        for (int i = 0; i < count; i++) {
            executor.submit(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(1000);
                        lock.lock();
                        Thread.sleep(100);
 
                        System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                        try {
                            lock.unlock();
                        } catch (KeeperException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
 
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        executor.shutdown();
    }
}

6 最後
其實再可以發散一下,實現一個分散式的read/write lock,也差不多就是這個理了。大致思路:

競爭資源標示: read_自增id , write_自增id;
首先按照自增id進行排序,如果佇列的前邊都是read標識,對應的所有read都獲得鎖。如果佇列的前邊是write標識,第一個write節點獲取鎖;
watcher監聽: read監聽距離自己最近的一個write節點的exist,write監聽距離自己最近的一個節點(read或者write節點)