1. 程式人生 > >Apache 開源的curator 基於Zookeeper實現分散式鎖以及原始碼分析

Apache 開源的curator 基於Zookeeper實現分散式鎖以及原始碼分析

前一段時間,我發表了一篇關於Redis實現分散式鎖 分散式環境下利用Redis實現分散式鎖,今天我帶領大家熟悉用zookeeper實現分散式鎖。

在學習分散式鎖之前,讓我們想一想,在什麼業務場景下會用到分散式鎖以及設計分散式鎖要注意什麼?

分散式鎖介紹

1、在什麼業務場景中會使用到分散式鎖

當多個客戶端訪問伺服器上同一個資源的時候,需要保證資料的一致性,比如秒殺系統,舉個栗子:

某件商品在系統中的數量是5件,當秒殺時間到來,會有大量的使用者搶購這件商品,瞬間會產生非常大的併發。正常的購買流程是:

step1、使用者下單

step2、判斷商品數量是否足夠

step3、如果足夠,庫存--

step4、如果庫存不夠,秒殺失敗。

假設此時商品只剩餘一件,使用者A對商品下單,商品數足夠,下單成功,系統還沒有來得及減庫存,使用者B也對同一件商品下單,此時商品數仍為1,最後導致系統會庫存減兩次,導致商品超賣現象。此時就需要對使用者下單-->減庫存的這一步操作進行加鎖,使操作成為原子操作。在單機、單程序環境下,使用JDK的ReentrantLcok或者synchronized完全足夠,但由於秒殺系統併發量極大,單機承受不了這樣的壓力極易宕機,此時就需要多臺伺服器、多程序支撐起這個業務,單機下的ReentrantLcok或者synchronized在此處毫無用武之地,此時就需要一把分散式鎖來保證某個時間段只有一個使用者訪問共享資源。

2、分散式鎖的注意事項

a、高效的獲取鎖和釋放鎖

b、在網路不穩定、中斷、宕機情況下要自動釋放鎖,防止自鎖

c、有阻塞鎖的特性,即使沒有獲取鎖,也會阻塞等待

d、具備非阻塞鎖特性,即沒有獲取到鎖,則直接返回獲取鎖失敗

e、具備可重入行,同一執行緒可多次獲得鎖

zookeeper實現分散式鎖

對於zookeeper,在此就不多介紹,我們可以利用zk的順序臨時節點這一特性來實現分散式鎖。思路如下:

1、獲取鎖時,在zk的目錄下建立一個節點,判斷該節點的需要在其兄弟節點中是否是最小的,若是最小的,則獲取鎖成功。

2、若不是最小的,則鎖已被佔用,需要對比自己小的節點註冊監聽器,如果鎖釋放,監聽到釋放鎖事件,判斷此時節點在其兄弟節點是不是最小的,如果是,獲取鎖。

下面我來介紹Apache 開源的curator 實現 Zookeeper 分散式鎖以及原始碼分析。

首先,匯入相關的maven

<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
	<version>2.10.0</version>
</dependency>

再看main方法

public class Application {

	private static String address = "192.168.1.100:2181";
	public static void main(String[] args) {
		//1、重試策略:初試時間為1s 重試3次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		//2、通過工廠建立連線
		CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
		//3、開啟連線
		client.start();
		//4 分散式鎖
		final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
		//讀寫鎖
		//InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");
		ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 5; i++) {
			fixedThreadPool.submit(new Runnable() {
				@Override
				public void run() {
					boolean flag = false;
					try {
						//嘗試獲取鎖,最多等待5秒
						flag = mutex.acquire(5, TimeUnit.SECONDS);
						Thread currentThread = Thread.currentThread();
						if(flag){
							System.out.println("執行緒"+currentThread.getId()+"獲取鎖成功");
						}else{
							System.out.println("執行緒"+currentThread.getId()+"獲取鎖失敗");
						}
						//模擬業務邏輯,延時4秒
						Thread.sleep(4000);
					} catch (Exception e) {
						e.printStackTrace();
					} finally{
						if(flag){
							try {
								mutex.release();
							} catch (Exception e) {
								e.printStackTrace();
							}
						}
					}
				}
			});
		}
	}
}

上面程式碼展示得知,

public boolean acquire(long time, TimeUnit unit)

方法是獲得鎖的方法,引數是自旋的時間,所以我們分析這個方法的原始碼。

public boolean acquire(long time, TimeUnit unit) throws Exception {
    return this.internalLock(time, unit);
}

可見,acquire呼叫的是internalLock方法

private boolean internalLock(long time, TimeUnit unit) throws Exception {
        Thread currentThread = Thread.currentThread();
        //獲得當前執行緒的鎖
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        //如果鎖不為空,當前執行緒已經獲得鎖,可重入鎖,lockCount++
        if (lockData != null) {
            lockData.lockCount.incrementAndGet();
            return true;
        } else {
            //獲取鎖,返回鎖的節點路徑
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
            if (lockPath != null) {
                //向當前鎖的map集合新增一個記錄
                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                return false;//獲取鎖失敗
            }
        }
    }

下面是threadData的資料結構,是一個Map結構,key是當前執行緒,value是當前執行緒和鎖的節點的一個封裝物件。

private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;

private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount;

        private LockData(Thread owningThread, String lockPath) {
            this.lockCount = new AtomicInteger(1);
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
}

由internalLock方法可看到,最重要的方法是attemptLock方法

String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        long startMillis = System.currentTimeMillis();
        //將等待時間轉化為毫秒
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        //重試次數
        int retryCount = 0;
        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;

        while(!isDone) {
            isDone = true;

            try {
                //在當前path下建立臨時有序節點
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
                //判斷是不是序號最小的節點,如果是返回true,否則阻塞等待
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }
        //返回當前鎖的節點路徑

        return hasTheLock ? ourPath : null;
    }

下面來看internalLockLoop方法,判斷是不是最小節點的方法

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }
            //自旋
            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
                //獲得所有子節點
                List<String> children = this.getSortedChildren();
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
                //判斷是否是最小節點
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {
                    //給比自己小的節點設定監聽器
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    //同步,是為了實現公平鎖
                    synchronized(this) {
                        try {
                            ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                            //如果等待時間==null,一直阻塞等待
                            if (millisToWait == null) {
                                this.wait();
                            } else {
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                //等待超時時間
                                if (millisToWait > 0L) {
                                    this.wait(millisToWait);
                                } else {
                                    doDelete = true;//如果超時則刪除鎖
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                            ;
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                //如果鎖超時,刪除鎖
                this.deleteOurPath(ourPath);
            }

        }

        return haveTheLock;
    }

釋放鎖:

public void release() throws Exception {
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        //當前執行緒沒有獲取鎖,不能釋放
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
        } else {
            int newLockCount = lockData.lockCount.decrementAndGet();
            if (newLockCount <= 0) {
                if (newLockCount < 0) {
                    throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
                } else {
                    //釋放鎖
                    try {
                        this.internals.releaseLock(lockData.lockPath);
                    } finally {
                        this.threadData.remove(currentThread);
                    }

                }
            }
        }
    }

好了,這就是我這次的分享內容,如有錯請提出,謝謝。

相關推薦

Apache 開源curator 基於Zookeeper實現分散式以及原始碼分析

前一段時間,我發表了一篇關於Redis實現分散式鎖 分散式環境下利用Redis實現分散式鎖,今天我帶領大家熟悉用zookeeper實現分散式鎖。 在學習分散式鎖之前,讓我們想一想,在什麼業務場景下會用到分散式鎖以及設計分散式鎖要注意什麼? 分散式鎖介紹 1、在什麼業務場

分散式學習筆記七:基於zookeeper實現分散式

一、分散式鎖介紹         分散式鎖主要用於在分散式環境中保護跨程序、跨主機、跨網路的共享資源實現互斥訪問,以達到保證資料的一致性。 二、架構介紹     &nb

C# 基於ZooKeeper實現分散式

主體思路 1. 在locks節點下建立臨時順序節點node_n2. 判斷當前建立的節點是否為locks節點下所有子節點中最小的子節點3. 是則獲取鎖,進行業務處理,否則將節點從小到大排序,監聽當前節點上一個節點的刪除事件4. 事件觸發後回到步驟2進行判斷,直至拿到鎖 程式碼塊分析 建構函

【原創】redis庫存操作,分散式的四種實現方式[連載一]--基於zookeeper實現分散式

一、背景 在電商系統中,庫存的概念一定是有的,例如配一些商品的庫存,做商品秒殺活動等,而由於庫存操作頻繁且要求原子性操作,所以絕大多數電商系統都用Redis來實現庫存的加減,最近公司專案做架構升級,以微服務的形式做分散式部署,對庫存的操作也單獨封裝為一個微服務,這樣在高併發情況下,加減庫存時,就會出現超賣等

基於zookeeper實現分散式

引言 在程式開發過程中不得不考慮的就是併發問題。在java中對於同一個jvm而言,jdk已經提供了lock和同步等。但是在分散式情況下,往往存在多個程序對一些資源產生競爭關係,而這些程序往往在不同的機器上,這個時候jdk中提供的已經不能滿足。分散式鎖顧明思議就是

基於zookeeper分散式實現

之前已經實現過基於redis的分散式鎖 這次用zookeeper來實現. 原理:ZooKeeper有四種形式的目錄節點,四種CreateMode PERSISTENT:持久化目錄節點,儲存的資料不會丟失。 PERSISTENT_SEQUENTIAL:順序自

基於Redis實現分散式

背景 在很多網際網路產品應用中,有些場景需要加鎖處理,比如:秒殺,全域性遞增ID,樓層生成等等。大部分的解決方案是基於DB實現的,Redis為單程序單執行緒模式,採用佇列模式將併發訪問變成序列訪問,且多客戶端對Redis的連線並不存在競爭關係。其次Redis提供一些命令SETNX,GETSET,可以方便

使用redis和zookeeper實現分散式

1.分散式鎖   分散式鎖一般用在分散式系統或者多個應用中,用來控制同一任務是否執行或者任務的執行順序。在專案中,部署了多個tomcat應用,在執行定時任務時就會遇到同一任務可能執行多次的情況,我們可以藉助分散式鎖,保證在同一時間只有一個tomcat應用執行了定時任務。 &nb

基於zookeeper分散式

實現分散式鎖目前有三種流行方案,分別為基於資料庫、Redis、Zookeeper的方案,其中前兩種方案網路上有很多資料可以參考,本文不做展開。我們來看下使用Zookeeper如何實現分散式鎖。 什麼是Zookeeper? Zookeeper(業界簡稱zk)是一種提供配置管

zookeeper實現分散式及 如何避免羊群效應

問題導讀: 1.zookeeper如何實現分散式鎖? 2.什麼是羊群效應? 3.zookeeper如何釋放鎖? 在zookeeper應用場景有關於分散式叢集配置檔案同步問題的描述,設想一下如果有100臺機器同時對同一臺機器上某個檔案進行修改,如何才能保證文字不會被寫亂,這就

zookeeper 實現分散式

 實現互斥鎖 package com.zookeeper.lock; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.uti

【原創】redis庫存操作,分散式的四種實現方式[連載二]--基於Redisson實現分散式

一、redisson介紹 redisson實現了分散式和可擴充套件的java資料結構,支援的資料結構有:List, Set, Map, Queue, SortedSet, ConcureentMap, Lock, AtomicLong, CountDownLatch。並且是執行緒安全的,底層使用N

ZooKeeper完全解析(七) 使用ZooKeeper實現分散式之Java實現

  在上一節中,我們講了使用ZooKeeper來實現分散式鎖的原理,連結為  ZooKeeper完全解析(六) 使用ZooKeeper實現分散式鎖之實現原理 ,這一節我們來講一下如何使用Java來實現分散式鎖:   在實現原理中,我們把使用ZooKeeper實現分散式鎖分成

zookeeper實現分散式

zookeeper介紹 一種提供配置管理、分散式協同以及命名的中心化服務 Zookeeper提供一個多層級的節點名稱空間(節點稱為znode), 每個節點都用一個以斜槓(/)分隔的路徑表示,而且每個節點都有父節點(根節點除外),非常類似於檔案系統。 例如,/fo

基於 Redis 實現分散式

什麼是Redis? Redis通常被稱為資料結構伺服器。這意味著Redis通過一組命令提供對可變資料結構的訪問,這些命令使用帶有TCP套接字和簡單協議的伺服器 - 客戶端模型傳送。因此,不同的程序可以以共享方式查詢和修改相同的資料結構。 Redis中實現的資料結構有一些特殊屬性:

Zookeeper 實現分散式(樂觀和悲觀)

說明: 做備忘用,大家之言彙總到一起。 Jar <!-- zkclient依賴 --> <dependency> <groupId>com.101tec</groupId> <art

10分鐘看懂!基於Zookeeper分散式

實現分散式鎖目前有三種流行方案,分別為基於資料庫、Redis、Zookeeper的方案,其中前兩種方案網路上有很多資料可以參考,本文不做展開。我們來看下使用Zookeeper如何實現分散式鎖。 什麼是Zookeeper? Zookeeper(業界簡稱zk)是一種提供配置管理、分散式協同以及命名的中心化

分散式原始碼剖析(4) zookeeper實現分散式

zookeeper分散式鎖(curator) maven配置檔案: <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes<

基於Redis實現分散式-Redisson使用及原始碼分析

在分散式場景下,有很多種情況都需要實現最終一致性。在設計遠端上下文的領域事件的時候,為了保證最終一致性,在通過領域事件進行通訊的方式中,可以共享儲存(領域模型和訊息的持久化資料來源),或者做全域性XA事務(兩階段提交,資料來源可分開),也可以藉助訊息中介軟體(消

基於ZooKeeper分散式和佇列

在分散式系統中,往往需要一些分散式同步原語來做一些協同工作,上一篇文章介紹了Zookeeper的基本原理,本文介紹下基於Zookeeper的Lock和Queue的實現,主要程式碼都來自Zookeeper的官方recipe。 鎖(Lock) 完全分散式鎖是全域性同步的,這意味著在任何時刻沒有兩個客戶端會同時