1. 程式人生 > >Zookeeper分散式鎖簡單實現(JAVA)

Zookeeper分散式鎖簡單實現(JAVA)

近段時間嘗試用視覺化介面把zookeeper的資料整合到公司的後臺系統中展示,進而查閱了資料研究zookeeper的使用,於是看著看著就手癢想用它的API實現一個簡單的分散式鎖.
本程式實現的分散式鎖適用於叢集單點故障,leader選舉等場景

下面先從一下幾個方面描述一下分散式鎖的概述,問題及程式解決思路

1. 概述

分散式鎖在一組程序之間提供了一種互斥機制。在任何時刻,在任何時刻只有一個程序可以持有鎖。分散式鎖可以在大型分散式系統中實現領導者選舉,在任何時間點,持有鎖的那個程序就是系統的領導者。

注意:不要將ZooKeeper自己的領導者選舉和使用了ZooKeeper

基本操作實現的一般領導者選混為一談。ZooKeeper自己的領導者選舉機制是對外不公開的,我們這裡所描述的一般領導者選舉服務則不同,他是對那些需要與主程序保持一致的分散式系統所設計的。

2. 原理

為了使用ZooKeeper來實現分散式鎖服務,我們使用順序znode來為那些競爭鎖的程序強制排序。

思路很簡單:

  1. 首先指定一個作為鎖的znode,通常用它來描述被鎖定的實體,稱為/distributed(或者其它任意名字均可);
  2. 然後希望獲得鎖的客戶端建立一些短暫順序znode,作為鎖znode的子節點。
  3. 在任何時間點,順序號最小的客戶端將持有鎖

例如,有兩個客戶端差不多同時建立znode,分別為/distributed/lock-1

/distributed/lock-2,那麼建立/distributed/lock-1的客戶端將會持有鎖,因為它的znode順序號最小。ZooKeeper服務是順序的仲裁者,因為它負責分配順序號。

  1. 通過刪除znode /leader/lock-l即可簡單地將鎖釋放;
  2. 另外,如果客戶端程序死亡,對應的短暫znode也會被刪除。
  3. 接下來,建立/leader/lock-2的客戶端將持有鎖,因為它順序號緊跟前一個。
  4. 通過建立一個關於znode刪除的watcher,可以使客戶端在獲得鎖時得到通知。

那麼針對以上的思路可以得到以下初步的演算法步驟:

  1. 在鎖znode下建立一個名為lock-的短暫順序znode,並且記住它的實際路徑名
    (create操作的返回值)。
  2. 查詢鎖znode的子節點並且設定一個觀察
  3. 如果步驟l中所建立的znode在步驟2中所返回的所有子節點中具有最小的順序號,則獲取到鎖。退出。
  4. 等待步驟2中所設觀察的通知並且轉到步驟2。

3. 問題及解決方案

上面的演算法還是比較初步的,仔細考慮的話裡面可能會有以下問題:
1. 羊群效應

問題描述:
雖然這個演算法是正確的,但還是存在一些問題。第一個問題是這種實現會受到羊群效應(herd effect)的影響。考慮有成百上千客戶端的情況,所有的客戶端都在嘗試獲得鎖,每個客戶端都會在鎖znode上設定一個觀察,用於捕捉子節點的變化。每次鎖被釋放或另外一個程序開始申請獲取鎖的時候,觀察都會被觸發並且每個客戶端都會收到一個通知。 羊群效應就是指大量客戶端收到同一事件的通知,但實際上只有很少一部分需要處理這一事件。在這種情況下,只有一個客戶端會成功地獲取鎖,但是維護過程及向所有客戶端傳送觀察事件會產生峰值流量,這會對ZooKeeper伺服器造成壓力。

解決方案:(本程式已實現)
為了避免出現羊群效應,我們需要優化通知的條件。關鍵在於只有在前一個順序號的子節點消失時才需要通知下一個客戶端,而不是刪除(或建立)任何子節點時都需要通知。在我們的例子中,如果客戶端建立了znode /distributed/lock-1、/distributed/lock-2和/distributed/lock-3,那麼只有當/distributed/lock-2消失時才需要通知/distributed/lock-3對照的客戶端;/distributed/lock-1消失或有新的znode /distributed/lock-4加入時,不需要通知該客戶端。
2. 可恢復的異常

問題描述:
這個申請鎖的演算法目前還存在另一個問題,就是不能處理因連線丟失而導致的create操作失敗。如前所述,在這種情況下,我們不知道操作是成功還是失敗。由於建立一個順序znode是非冪等操作,所以我們不能簡單地重試,因為如果第一次建立已經成功,重試會使我們多出一個永遠刪不掉的孤兒znode(至少到該客戶端會話結束前)。不幸的結果是將會出現死鎖(即該會話的第二個znode等待自己的第一個znode刪除)。

解決方案: (本程式未實現)

問題在於,在重新連線之後客戶端不能夠判斷它是否已經建立過子節點。解決方案是在znode的名稱中嵌入一個ID,如果客戶端出現連線丟失的情況,重新連線之後它便可以對鎖節點的所有於節點進行檢查,看看是否有子節點的名稱中包含其ID。如果有一個子節點的名稱包含其ID,它便知道建立操作已經成功,不需要再建立子節點。如果沒有子節點的名稱中包含其ID,則客戶端可以安全地建立一個新的順序子節點。

客戶端會話的ID是一個長整數,並且在ZooKeeper服務中是唯一的,因此非常適合在連線丟失後用於識別客戶端。可以通過呼叫Java ZooKeeper類的getSessionld()方法來獲得會話的ID

在建立短暫順序znode時應當採用lock-<sessionld>-這樣的命名方式,ZooKeeper在其尾部新增順序號之後,znode的名稱會形如lock-<sessionld>-<sequenceNumber>。由於順序號對於父節點來說是唯一的,但對於子節點名並不唯一,因此採用這樣的命名方式可以詿子節點在保持建立順序的同時能夠確定自己的建立者。

3 不可恢復的異常

描述
如果一個客戶端的ZooKeeper會話過期,那麼它所建立的短暫znode將會被刪除,已持有的鎖會被釋放,或是放棄了申請鎖的位置。使用鎖的應用程式應當意識到它已經不再持有鎖,應當清理它的狀態,然後通過建立並嘗試申請一個新的鎖物件來重新啟動。注意,這個過程是由應用程式控制的,而不是鎖,因為鎖是不能預知應用程式需要如何清理自己的狀態

具體的程式如下:

//LockException.java原始碼
package zookeeper.application.my.distributedlock;

/**
 * Created by chenyuzhi on 17-10-26.
 */
public class LockException extends RuntimeException {
    public LockException(String message) {
        super(message);
    }

    public LockException(Throwable cause) {
        super(cause);
    }
}
//Zknode.java原始碼
package zookeeper.application.my.distributedlock;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

/**
 * Created by chenyuzhi on 17-10-26.
 */
public class ZkNode implements Watcher {
    private String id;
    private ZooKeeper zkClient;
    private String lockPath;
    private String currNodePath;
    private CountDownLatch latch;
    private DistributedLock lock;
    private final int sessionTimeout = 30000;

    public void doSomethingAsLeader(){
        try {
            lock.lock();
            System.out.println(MessageFormat.format("[{0}-{1}]: Now I am leader, all must follow!",
                    id,currNodePath));

            Thread.sleep(new Random().nextInt(3000)+500);

        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
            lock = null;
            clear();
        }


    }

    public ZkNode(String config,String lockPath,int id) {

        try {
            zkClient = new ZooKeeper(config, sessionTimeout, this);

            if(null == zkClient.exists(lockPath,false)){
                synchronized (ZkNode.class){
                    if(null == zkClient.exists(lockPath,false)){
                        zkClient.create(lockPath,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                    }
                }
            }

            this.lockPath = lockPath;
            this.id = "node" + id;
            this.currNodePath = zkClient.create(lockPath+"/lock",new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

            this.lock = new DistributedLock(this);


        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(null != latch){
            latch.countDown();
        }
    }

    public void clear(){

        try {
            if(null != zkClient) {
                zkClient.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public String getId() {
        return id;
    }

    public void setLatch(CountDownLatch latch) {
        this.latch = latch;
    }

    public String getLockPath() {
        return lockPath;
    }

    public String getCurrNodePath() {
        return currNodePath;
    }

    public ZooKeeper getZkClient() {
        return zkClient;
    }
    public CountDownLatch getLatch() {
        return latch;
    }
}
//DistributedLock.java原始碼
package zookeeper.application.my.distributedlock;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.text.MessageFormat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Created by chenyuzhi on 17-10-26.
 */
public class DistributedLock implements Lock {
    private final ZooKeeper zkClient;
    private final String lockNodePath;
    private final String currNodePath;
    private final ZkNode zkNode;
    private String waitNodePath;



    public DistributedLock(ZkNode zkNode) {
        this.lockNodePath = zkNode.getLockPath();
        this.currNodePath = zkNode.getCurrNodePath();
        this.zkClient = zkNode.getZkClient();
        this.zkNode = zkNode;
    }

    @Override
    public void lock() {
        if(tryLock()){
            return;
        }
        waitForLock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        try {
            List<String> nodeChildren = zkClient.getChildren(lockNodePath,false);
            String currNodeName = this.currNodePath.substring(currNodePath.lastIndexOf("/") + 1);
            Collections.sort(nodeChildren);
            if(currNodeName.equals(nodeChildren.get(0)))
                return true;
            this.waitNodePath = this.lockNodePath + "/" +
                    nodeChildren.get(Collections.binarySearch(nodeChildren,currNodeName) - 1);
            return false;

        }catch (Exception e){
            throw new LockException(e);
        }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if(tryLock()){
            return true;
        }
        return waitForLock(time,unit);
    }

    @Override
    public void unlock() {
        try {
            if(null != zkClient.exists(currNodePath,false)){
                System.out.println(MessageFormat.format("[{0}-{1}]: Unlocking!",
                        zkNode.getId(),currNodePath));
                zkClient.delete(currNodePath,-1);
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }

    private boolean waitForLock(long time, TimeUnit unit){
        boolean retValue = false;
        try {
            Stat stat = zkClient.exists(this.waitNodePath,true); 

            if(stat != null){
                System.out.println(MessageFormat.format("[{0}-{1}]: waiting for  {2}!",
                        zkNode.getId(),currNodePath,waitNodePath));
                this.zkNode.setLatch(new CountDownLatch(1)); 
                retValue = this.zkNode.getLatch().await(time,unit);
                this.zkNode.setLatch(null);
            }
            return retValue;
        } catch (InterruptedException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        }
    }

    private boolean waitForLock(){

        try {
            Stat stat = zkClient.exists(this.waitNodePath,true);  //maybe has a bug because the continuity of thread executor

            if(stat != null){
                System.out.println(MessageFormat.format("[{0}-{1}]: waiting for  {2}!",
                        zkNode.getId(),currNodePath,waitNodePath));
                this.zkNode.setLatch(new CountDownLatch(1));  //maybe has a bug because the continuity of thread executor
                this.zkNode.getLatch().await();
                this.zkNode.setLatch(null);
            }
            return true;
        } catch (InterruptedException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
//ConcurrentTest.java原始碼
package zookeeper.application.my.distributedlock;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by chenyuzhi on 17-10-26.
 */
public class ConcurrentTest {
    private CopyOnWriteArrayList<Long> list = new CopyOnWriteArrayList<Long>();
    private CountDownLatch doneSignal;
    private AtomicInteger err = new AtomicInteger();//原子遞增
    public static void main(String[] args){
        new ConcurrentTest().test(15);
    }

    private void test(int nodeNum){
        doneSignal = new CountDownLatch(nodeNum);
        for(int i=0;i<nodeNum;i++){
            final int  index = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    ZkNode zkNode = new ZkNode("127.0.0.1:2183","/distributed",index);
                    long start = System.currentTimeMillis();
                    zkNode.doSomethingAsLeader();
                    long end = (System.currentTimeMillis() - start);
                    list.add(end);
                    doneSignal.countDown();
                }
            }).start();
        }

        try {
            doneSignal.await();
            getExeTime();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 計算平均響應時間
     */
    private void getExeTime() {
        int size = list.size();
        List<Long> _list = new ArrayList<Long>(size);
        _list.addAll(list);
        Collections.sort(_list);
        long min = _list.get(0);
        long max = _list.get(size-1);
        long sum = 0L;
        for (Long t : _list) {
            sum += t;
        }
        long avg = sum/size;
        System.out.println("min: " + min);
        System.out.println("max: " + max);
        System.out.println("avg: " + avg);

    }
}

需要的maven依賴:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

執行結果:

zookeeper.application.my.distributedlock.ConcurrentTest
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[node3-/distributed/lock0000000017]: Now I am leader, all must follow!
[node14-/distributed/lock0000000023]: waiting for  /distributed/lock0000000022!
[node5-/distributed/lock0000000021]: waiting for  /distributed/lock0000000020!
[node8-/distributed/lock0000000025]: waiting for  /distributed/lock0000000024!
[node4-/distributed/lock0000000028]: waiting for  /distributed/lock0000000027!
[node1-/distributed/lock0000000019]: waiting for  /distributed/lock0000000018!
[node9-/distributed/lock0000000029]: waiting for  /distributed/lock0000000028!
[node0-/distributed/lock0000000022]: waiting for  /distributed/lock0000000021!
[node10-/distributed/lock0000000030]: waiting for  /distributed/lock0000000029!
[node7-/distributed/lock0000000026]: waiting for  /distributed/lock0000000025!
[node6-/distributed/lock0000000031]: waiting for  /distributed/lock0000000030!
[node12-/distributed/lock0000000027]: waiting for  /distributed/lock0000000026!
[node11-/distributed/lock0000000024]: waiting for  /distributed/lock0000000023!
[node2-/distributed/lock0000000018]: waiting for  /distributed/lock0000000017!
[node13-/distributed/lock0000000020]: waiting for  /distributed/lock0000000019!
[node3-/distributed/lock0000000017]: Unlocking!
[node2-/distributed/lock0000000018]: Now I am leader, all must follow!
[node2-/distributed/lock0000000018]: Unlocking!
[node1-/distributed/lock0000000019]: Now I am leader, all must follow!
[node1-/distributed/lock0000000019]: Unlocking!
[node13-/distributed/lock0000000020]: Now I am leader, all must follow!
[node13-/distributed/lock0000000020]: Unlocking!
[node5-/distributed/lock0000000021]: Now I am leader, all must follow!
[node5-/distributed/lock0000000021]: Unlocking!
[node0-/distributed/lock0000000022]: Now I am leader, all must follow!
[node0-/distributed/lock0000000022]: Unlocking!
[node14-/distributed/lock0000000023]: Now I am leader, all must follow!
[node14-/distributed/lock0000000023]: Unlocking!
[node11-/distributed/lock0000000024]: Now I am leader, all must follow!
[node11-/distributed/lock0000000024]: Unlocking!
[node8-/distributed/lock0000000025]: Now I am leader, all must follow!
[node8-/distributed/lock0000000025]: Unlocking!
[node7-/distributed/lock0000000026]: Now I am leader, all must follow!
[node7-/distributed/lock0000000026]: Unlocking!
[node12-/distributed/lock0000000027]: Now I am leader, all must follow!
[node12-/distributed/lock0000000027]: Unlocking!
[node4-/distributed/lock0000000028]: Now I am leader, all must follow!
[node4-/distributed/lock0000000028]: Unlocking!
[node9-/distributed/lock0000000029]: Now I am leader, all must follow!
[node9-/distributed/lock0000000029]: Unlocking!
[node10-/distributed/lock0000000030]: Now I am leader, all must follow!
[node10-/distributed/lock0000000030]: Unlocking!
[node6-/distributed/lock0000000031]: Now I am leader, all must follow!
[node6-/distributed/lock0000000031]: Unlocking!
min: 1032
max: 31075
avg: 16046

Process finished with exit code 0

在程式中我在兩處地方標註了可能出現bug的地方,這是因為:
1. 若當前執行緒執行到

Stat stat = zkClient.exists(this.waitNodePath,true);

後,當前執行緒退出資源的佔用,然後恰巧此時它剛才設定觀察的znode deletele了,那麼後面的

this.zkNode.setLatch(new CountDownLatch(1)); 
this.zkNode.getLatch().await();

兩個語句執行後將導致當前執行緒一直出於阻塞狀態,因為設定觀察的znode deletele了,不會有watcher的回撥執行latch.countDown();

  1. 若當前執行緒執行到
this.zkNode.setLatch(new CountDownLatch(1));

後,當前執行緒退出資源的佔用,然後恰巧此時它剛才設定觀察的znode deletele了,那麼也是同樣的原因:

this.zkNode.getLatch().await();

該語句執行後將導致當前執行緒一直出於阻塞狀態,因為設定觀察的znode deletele了,不會有watcher的回撥執行latch.countDown();

解決方法
最好的解決辦法是採取類似於傳統資料的事務形式去強制此段程式碼的連續執行(即一致性),使其要麼同時成功,要麼同時失敗,但是在本例中暫時還找不到利用比較簡單的邏輯是去實現這個想法的方法.

有人說可以利用同步鎖去實現(比如synchronized),但是在分散式情況下,你不能保證當前執行緒等待的前面znode對應的節點突然掛掉,導致回撥仍然可能會發生以上兩處地方

由於小弟程式設計能力尚淺,有些特殊情況的轉換沒能考慮好,希望各位可以提出,或者貼出更完善的解析程式供大家分享,在此處權當拋磚引玉了。