1. 程式人生 > >實戰 -- Zookeeper實現分散式鎖

實戰 -- Zookeeper實現分散式鎖

場景分析

比如購買商品的操作,首先獲取商品的庫存,判斷庫存是否充足,充足的話建立訂單減庫存,不充足的話不允許建立訂單。

有一款新型膝上型電腦,庫存只剩下1件的時候,有客戶A、B從不同的客戶端(比如網站和APP)上看中了這款電腦,兩人同時進行下單操作。

A和B同時獲取庫存,A購買1件,庫存為1,判斷充足,建立訂單,庫存減1,B購買1件,庫存為1,判斷充足,建立訂單,庫存減1。

結果是僅剩1件的商品,被兩次下單,庫存變成了-1。顯然這種結果是錯誤的,A和B之間有一人的訂單無法發貨。

如何解決呢

可以用一個鎖,A在下單的時候,給庫存加上一個鎖,此時除了A以外,任何人都不能對庫存進行操作,B在獲取庫存的時候 ,由於A對庫存加上了鎖,所以B只好等待A釋放鎖之後在繼續。

A建立完訂單,對庫存減1,釋放鎖,B獲取鎖再繼續獲取庫存,此時庫存為0,判斷庫存不充足,無法建立訂單。保證了庫存僅剩1的商品只能被下單1次。

這是分散式鎖實現的一種方案。

什麼是分散式鎖

分散式鎖,是控制分散式系統或不同系統之間訪問共享資源的一種鎖的實現,其主要解決的問題就是保證資料一致性。

Zookeeper實現的分散式鎖,是利用節點的操作來進行的,加鎖,就是建立節點(臨時節點),解鎖就是刪除節點,同一個業務都是在同一父節點下進行加鎖和解鎖操作,如果該業務父節點下有子節點,則說明該業務已經被鎖住了,如果沒有子節點,則沒被加鎖。

臨時節點的特點是,當會話失效時,Zookeeper自動清除,避免獲取鎖的客戶端掉線後,沒有刪除鎖節點,而其他客戶端都在等這個鎖節點刪除,產生了死鎖。

實現分散式鎖的兩種方式

一、單節點鎖

在某一業務節點,只允許建立1個子節點,代表鎖,所有客戶端爭搶建立子節點許可權,搶到並建立,則加鎖成功,沒搶到,則等待機會。如圖所示:

圖1

  1. 客戶端準備加鎖的時候,檢視該業務節點下有沒有子節點,如果沒有,則建立節點,此客戶端獲得鎖,執行業務操作。
  2. 如果存在子節點,則代表當前業務被加鎖,此時客戶端掛起,監聽業務節點的子節點變化
  3. 客戶端獲取鎖並執行完業務之後,刪除該節點,Zookeeper通知其他客戶端,喚醒掛起,繼續嘗試建立節點。

二、多節點鎖

在單節點鎖中,所有客戶端都操作同一個節點,當只有鎖的客戶端釋放鎖時,其他的客戶端都從掛起狀態中喚醒,來競爭鎖。誰先獲取鎖與客戶端的網路狀態和Zookeeper的伺服器CPU排程等不可控因素有關,和誰先來後到的無關。

如果希望客戶端能按照先來後到的順序來獲取鎖,就需要用多節點鎖來實現,即每個客戶端在同一業務節點下建立專屬自己的順序節點,按照順序節點的序號,來決定誰獲取鎖。如圖:

多節點鎖

  1. 某個客戶端嘗試加鎖時,先在該業務節點下,建立一個順序節點
  2. 建立完成後,獲取出該業務節點下的所有子節點,並按照按照節點序號排序
  3. 判斷第一位的節點是否為自己的節點,是的話,代表獲取鎖,執行業務操作
  4. 不是的話,對排在自己前一位的節點進行監聽,客戶端掛起
  5. 當客戶端執行業務完畢後,刪除自己的節點,並通知監聽自己節點的客戶端進行業務操作。

多節點程式碼實現

類設計如下

圖三

  1. BusinessTypeEnum列舉,定義了業務的型別,用來區分不同業務,如果要對某個業務加鎖,就在BusinessTypeEnum定義的業務型別下建立節點
  2. CuatorExt介面,操作Zookeeper的客戶端,定義了一些操作方法
  3. AbstractCuatorExt類,客戶端CuatorExt介面方法的實現,規範了客戶端基本結構
  4. BaseDistributedLock類,繼承了AbstractCuatorExt,分散式鎖實現的核心,規範了分散式鎖結構,對它的子類公開獲取鎖的方法。
  5. DistributedLock介面,分散式鎖對外公開的介面,提供獲取鎖和釋放鎖兩種功能。
  6. DistributedLockImpl類是對DistributedLock介面的實現
  7. BuyService類,業務類。

BusinessTypeEnum

public enum  BusinessTypeEnum {
    items("/items"),
    orders("/orders");
    private String value;
    BusinessTypeEnum(String value){
        this.value = value;
    }

    public String getValue() {
        return value;
    }
}

CuatorExt

public interface CuatorExt {
    /**
     * 建立臨時序列節點
     * @param basePath
     * @return
     */
    public String createEphemeralSequential(String basePath) throws Exception;

    /**
     * 刪除節點
     * @param ourPath
     */
    public void delete(String ourPath) throws Exception;

    /**
     * 獲取子節點
     * @param basePath
     * @return
     */
    public List<String> getChildren(String basePath) throws Exception;

}

BaseDistributedLock

public class BaseDistributedLock extends AbstractCuatorExt {
    private static final String NAME_SPACE="lock_namespace";
    private static final String DISTRIBUTED_LOCK = "/lock-";
    BaseDistributedLock(CuratorFramework client) {
        super(client);
    }
    private static final Integer MAX_RETRY_COUNT = 10;//重試次數
    public void init(){
        this.client = this.client.usingNamespace(NAME_SPACE);
        for(BusinessTypeEnum b : BusinessTypeEnum.values()){
            try {
                if(this.client.checkExists().forPath(b.getValue())==null){
                    this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(b.getValue());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 獲取鎖,並設定超時時間
     * @param time
     * @param timeUnit 
     * @param businessType
     * @return
     * @throws Exception
     */
    protected String attemptLock(long time, TimeUnit timeUnit, BusinessTypeEnum businessType) throws Exception {
        boolean goDone = false;
        String ourPath = null;
        String lockName = null;
        long startMillis = System.currentTimeMillis();
        int count = 0;
        while (!goDone) {
            goDone = true;
            try {
                ourPath = createEphemeralSequential(businessType.getValue()+DISTRIBUTED_LOCK);
                lockName = waitToLock(startMillis, time, timeUnit, businessType, ourPath);
            } catch (Exception e) {
                if (count++ < MAX_RETRY_COUNT) {
                    goDone = false;
                } else {
                    throw e;
                }
            }
        }
        return lockName;
    }

    private String waitToLock(long startMillis, long time, TimeUnit timeUnit, BusinessTypeEnum businessType, String ourPath) throws Exception {
        boolean haveLock = false;
        String lockName = null;
        Long waitMillis = timeUnit == null ? null : timeUnit.toMillis(time);
        boolean doDelete = false;
        try {
            while (!haveLock) {
                List<String> children = getChildrenAndSortThem(businessType.getValue());
                int index = children.indexOf(ourPath.substring(( businessType.getValue() + "/").length()));
                if (index < 0) {
                    throw new Exception("無此節點:" + ourPath);
                }
                if (index == 0) {
                    haveLock = true;
                    lockName = ourPath;
                } else {
                    String frontPath = children.get(index-1);
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    getClient().getData().usingWatcher(new CuratorWatcher() {
                        @Override
                        public void process(WatchedEvent watchedEvent) throws Exception {
                            countDownLatch.countDown();
                            lg.info(frontPath + "完成");
                        }
                    }).forPath(businessType.getValue()+"/"+frontPath);
                    if(waitMillis!=null){
                        waitMillis = System.currentTimeMillis() -  startMillis;
                        if(waitMillis>0){
                            lg.info(ourPath + "等待" + frontPath + "完成");
                            countDownLatch.await(waitMillis,timeUnit);
                        }else{
                            throw new Exception(ourPath+"等待超時");
                        }
                    }else{
                        lg.info(ourPath + "等待" + frontPath + "完成");
                        countDownLatch.await();
                    }
                    startMillis = System.currentTimeMillis();
                }
            }
        } catch (Exception e) {
            doDelete = true;
            throw e;
        }finally {
            if(doDelete){
                delete(ourPath);
            }
        }
        return lockName;
    }

    private List<String> getChildrenAndSortThem(String basePath) {
        List<String> children = null;
        try {
            children = getChildren(basePath);
            Collections.sort(children, new Comparator<String>() {
                @Override
                public int compare(String o1, String o2) {
                    return getLockNumber(o1, basePath.length()) - getLockNumber(o2, basePath.length());
                }

            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        return children;
    }

    private int getLockNumber(String node, int suff) {
        node = node.substring(suff);
        return Integer.parseInt(node);
    }
}

AbstractCuatorExt

public class AbstractCuatorExt implements CuatorExt {
    final static Logger lg = LoggerFactory.getLogger(AbstractCuatorExt.class);
    public CuratorFramework client;
    AbstractCuatorExt(CuratorFramework client){
        this.client = client;
    }

    public CuratorFramework getClient() {
        return client;
    }

    @Override
    public String createEphemeralSequential(String basePath) throws Exception {
        String o = this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(basePath);
        return o;
    }

    @Override
    public void delete(String ourPath) throws Exception {
        this.client.delete().forPath(ourPath);
    }

    @Override
    public List<String> getChildren(String basePath) throws Exception {
        List<String> children = this.client.getChildren().forPath(basePath);
        return children;
    }
}

DistributedLock

/**
 * 分配鎖
 */
public  interface DistributedLock {

    /**獲取鎖,如果沒有得到就等待*/
    public  String acquire(BusinessTypeEnum businessType)  throws Exception;

    /**
     * 獲取鎖,直到超時
     * @param time 超時時間
     * @param unit time引數的單位
     * @return是否獲取到鎖
     * @throws Exception
     */
    public String acquire(BusinessTypeEnum businessType,long time, TimeUnit unit)  throws Exception;

    /**
     * 釋放鎖
     * @throws Exception
     */
    public void release(String lockName)  throws Exception;


}

DistributedLockImpl

public class DistributedLockImpl extends BaseDistributedLock implements DistributedLock {
    DistributedLockImpl(CuratorFramework client) {
        super(client);
    }

    @Override
    public String acquire(BusinessTypeEnum businessType) throws Exception {
        return attemptLock(0,null,businessType);
    }

    @Override
    public String acquire(BusinessTypeEnum businessType, long time, TimeUnit unit) throws Exception {
        return attemptLock(time,unit,businessType);
    }

    @Override
    public void release(String lockName) throws Exception {
        delete(lockName);
    }
}

BuyService

@Service("byService")
public class ByServiceImpl implements BuyService {
    static int i = 0;
    Logger lg = LoggerFactory.getLogger(ByServiceImpl.class);
    @Autowired
    OrderService orderService;
    @Autowired
    ItemService itemService;
    @Autowired
    DistributedLock distributedLock;
    @Override
    public String getLock(String name) {
        lg.info("開始獲取鎖");
        String lockName = null;
        try {
            lockName = distributedLock.acquire(BusinessTypeEnum.items);
            lg.info(lockName + "進行業務中:");
            Thread.sleep(30000);
            distributedLock.release(lockName);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }

        lg.info(lockName+"釋放完畢");
        return lockName;
    }
}

Spring配置

<bean id="distributedLock" class="xin.youhuila.shopdobbo.web.lock.impl.DistributedLock" init-method="init">
   <constructor-arg ref="client"/>
</bean>
<bean id="retryPolicy" class="org.apache.curator.retry.RetryNTimes">
   <constructor-arg index="0" value="10"/>
   <constructor-arg index="1" value="5000"/>
</bean>
<bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
   <constructor-arg index="0" value="localhost:2181"/>
   <constructor-arg index="1" value="10000"/>
   <constructor-arg index="2" value="5000"/>
   <constructor-arg index="3" ref="retryPolicy"/>
</bean>

效果圖

圖4

單節點鎖程式碼實現

比較簡單,直接看程式碼吧

public class DistributedLock {
    private CuratorFramework client = null;
    final static Logger lg =LoggerFactory.getLogger(DistributedLockImooc.class);
    private static CountDownLatch zkLocklatch = new CountDownLatch(1);
    private static final String ZK_LOCK_PROJECT = "imooc-locks";
    private static final String DISTRIBUTED_LOCK = "distributed_lock";

    public DistributedLockImooc(CuratorFramework client) {
        this.client = client;
    }
    private void init(){
        client = client.usingNamespace("ZKLocks-Namespace");
        try {
            if(client.checkExists().forPath("/".concat(ZK_LOCK_PROJECT))==null){
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath("/"+ZK_LOCK_PROJECT);
            }
            addWatcherToLock("/"+ZK_LOCK_PROJECT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void addWatcherToLock(String path) throws Exception {
        final PathChildrenCache cache = new PathChildrenCache(client,path,true);
        cache.start( PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener(){

            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
                    String path = event.getData().getPath();
                    lg.info("上一個會話已釋放鎖或斷開"+path);
                    lg.info("釋放計數器,讓當前請求來獲得分散式鎖");
                    zkLocklatch.countDown();

                }

            }
        });
    }

    public boolean releaseLock(){
        try {
            if(client.checkExists().forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK)!=null){
                client.delete().forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        lg.info("釋放成功");
        return true;
    }

    public void getLock(){
        int i = 0;
        while(true){
            try {
                client.create().creatingParentsIfNeeded().withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
                lg.info("獲取分散式鎖成功");
                return;
            } catch (Exception e) {
                lg.info("獲取分散式鎖失敗");
                try {
                if(zkLocklatch.getCount()<=0){
                    zkLocklatch = new CountDownLatch(1);
                }
                    zkLocklatch.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                lg.info("第"+i+"嘗試此獲取鎖");
            }
        }
    }
}