1. 程式人生 > >分散式學習筆記五:redis分步式鎖

分散式學習筆記五:redis分步式鎖

前言

分散式鎖一般有三種實現方式:1. 資料庫樂觀鎖;2. 基於Redis的分散式鎖;3. 基於ZooKeeper的分散式鎖。本篇部落格將介紹第二種方式,基於Redis實現分散式鎖。雖然網上已經有各種介紹Redis分散式鎖實現的部落格,然而他們的實現卻有著各種各樣的問題,本篇部落格將詳細介紹如何正確地實現Redis分散式鎖。
需滿足如下條件:
-互斥性。在任意時刻,只有一個客戶端能持有鎖。 
- 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其他客戶端能加鎖。 
- 具有容錯性。只要大部分的Redis節點正常執行,客戶端就可以加鎖和解鎖。 
- 解鈴還須繫鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。

加鎖

正確方式:

首先我們要通過Maven引入Jedis開源元件,在pom.xml檔案加入下面的程式碼:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>


注:如果在springBoot,可以使用springBoot自己封裝的redisTemplate.自行百度。

加鎖程式碼:

public class RedisTool {

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 嘗試獲取分散式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識 (可通過UUID生成唯一標識)
     * @param expireTime 超期時間
     * @return 是否獲取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}


錯誤例項1: 
比較常見的錯誤示例就是使用jedis.setnx()和jedis.expire()組合實現加鎖,程式碼如下:

public static void wrongGetLock1(Jedis jedis, String lockKey, String requestId, int expireTime) {

    Long result = jedis.setnx(lockKey, requestId);
    if (result == 1) {
        // 若在這裡程式突然崩潰,則無法設定過期時間,將發生死鎖
        jedis.expire(lockKey, expireTime);
    }
}


setnx()方法作用就是SET IF NOT EXIST,expire()方法就是給鎖加一個過期時間。乍一看好像和前面的set()方法結果一樣,然而由於這是兩條Redis命令,不具有原子性,如果程式在執行完setnx()之後突然崩潰,導致鎖沒有設定過期時間。那麼將會發生死鎖。網上之所以有人這樣實現,是因為低版本的jedis並不支援多引數的set()方法。

解鎖程式碼

正確方式:

public class RedisTool {

    private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 釋放分散式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識
     * @return 是否釋放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}


可以看到,我們解鎖只需要兩行程式碼就搞定了!第一行程式碼,我們寫了一個簡單的Lua指令碼程式碼,第二行程式碼,我們將Lua程式碼傳到jedis.eval()方法裡,並使引數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua程式碼交給Redis服務端執行。那麼這段Lua程式碼的功能是什麼呢?其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,如果相等則刪除鎖(解鎖)。那麼為什麼要使用Lua語言來實現呢?因為要確保上述操作是原子性的。簡單來說,就是在eval命令執行Lua程式碼的時候,Lua程式碼將被當成一個命令去執行,並且直到eval命令執行完成,Redis才會執行其他命令。

錯誤示例1:

public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
    jedis.del(lockKey);
}


最常見的解鎖程式碼就是直接使用jedis.del()方法刪除鎖,這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的。

錯誤示例2:

public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {

    // 判斷加鎖與解鎖是不是同一個客戶端
    if (requestId.equals(jedis.get(lockKey))) {
        // 若在此時,這把鎖突然不是這個客戶端的,則會誤解鎖
        jedis.del(lockKey);
    }
}


如程式碼註釋,問題在於如果呼叫jedis.del()方法的時候,這把鎖已經不屬於當前客戶端的時候會解除他人加的鎖。那麼是否真的有這種場景?答案是肯定的,比如客戶端A加鎖,一段時間之後客戶端A解鎖,在執行jedis.del()之前,鎖突然過期了,此時客戶端B嘗試加鎖成功,然後客戶端A再執行del()方法,則將客戶端B的鎖給解除了。
 

springBoot中實現

首先需要引入redisson

        <!--redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--redis分散式鎖-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.4.3</version>
        </dependency>



生成Redisson的bean 
支援單機,主從,哨兵,叢集等模式,具體方式請參考https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95,這裡只演示叢集環境。

 @Bean
    Redisson redissonSentinel() {
        Config config = new Config();
        config.useClusterServers()
                .setScanInterval(2000) // 叢集狀態掃描間隔時間,單位是毫秒
                //可以用"rediss://"來啟用SSL連線
                .addNodeAddress("redis://10.82.0.102:7000")
                .addNodeAddress("redis://10.82.0.102:7001")
                .addNodeAddress("redis://10.82.0.102:7002")
                .addNodeAddress("redis://10.82.0.102:7003")
                .addNodeAddress("redis://10.82.0.102:7004")
                .addNodeAddress("redis://10.82.0.102:7005");
        return (Redisson)Redisson.create(config);
    }


這裡只是簡單展示,配置更加詳細的,參考上面網站。

簡單使用實現:   

@Autowired
    Redisson redisson;


    RLock lock = redisson.getLock(key);
    lock.lock(60, TimeUnit.SECONDS); //設定60秒自動釋放鎖  (預設是30秒自動過期)

    //執行的業務程式碼

    lock.unlock(); //釋放鎖


關於Redisson 更加全面詳細鎖的情況,前往:https://github.com/redisson/redisson/wiki/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8

就這樣通過redisson就實現redis分散式鎖,內部幫我們解決了上一篇提到的注意的地方。使用redisson更加體現一切皆物件,我們不需要知道內部如何實現,只需知道如何使用就行。當然作為一個積極進取的程式設計師還是要了解底層實現的。

原理簡介

Redisson 分散式鎖實現分析

java.util.concurrent.locks 中包含了 JDK 提供的在多執行緒情況下對共享資源的訪問控制的一系列工具,它們可以幫助我們解決程序內多執行緒併發時的資料一致性問題。

但是在分散式系統中,JDK 原生的併發鎖工具在一些場景就無法滿足我們的要求了,這就是為什麼要使用分散式鎖。我總結了一句話,分散式鎖是用於解決分散式系統中操作共享資源時的資料一致性問題。

設計分散式鎖要注意的問題

互斥

分散式系統中執行著多個節點,必須確保在同一時刻只能有一個節點的一個執行緒獲得鎖,這是最基本的一點。

死鎖

分散式系統中,可能產生死鎖的情況要相對複雜一些。分散式系統是處在複雜網路環境中的,當一個節點獲取到鎖,如果它在釋放鎖之前掛掉了,或者因網路故障無法執行釋放鎖的命令,都會導致其他節點無法申請到鎖。

因此分散式鎖有必要設定時效,確保在未來的一定時間內,無論獲得鎖的節點發生了什麼問題,最終鎖都能被釋放掉。

效能

對於訪問量大的共享資源,如果針對其獲取鎖時造成長時間的等待,導致大量節點阻塞,是絕對不能接受的。

所以設計分散式鎖時要能夠掌握鎖持有者的動態,若判斷鎖持有者處於不活動狀態,要能夠強制釋放其持有的鎖。
此外,排隊等待鎖的節點如果不知道鎖何時會被釋放,則只能隔一段時間嘗試獲取一次鎖,這樣無法保證資源的高效利用,因此當鎖釋放時,要能夠通知等待佇列,使一個等待節點能夠立刻獲得鎖。

重入

考慮到一些應用場景和資源的高效利用,鎖要設計成可重入的,就像 JDK 中的 ReentrantLock 一樣,同一個執行緒可以重複拿到同一個資源的鎖。

RedissonLock 實現解讀

本文中 Redisson 的程式碼版本為 2.2.17-SNAPSHOT。

這裡以 lock() 方法為例,其他一系列方法與其核心實現基本一致。

先來看 lock() 的基本用法

RLock lock = redisson.getLock("foobar"); // 1.獲得鎖物件例項
lock.lock(); // 2.獲取分散式鎖
try {
    // do sth.
} finally {
    lock.unlock(); // 3.釋放鎖
}
  1. 通過 RedissonClient 的 getLock() 方法取得一個 RLock 例項。
  2. lock() 方法嘗試獲取鎖,如果成功獲得鎖,則繼續往下執行,否則等待鎖被釋放,然後再繼續嘗試獲取鎖,直到成功獲得鎖。
  3. unlock() 方法釋放獲得的鎖,並通知等待的節點鎖已釋放。

下面來看看 RedissonLock 的具體實現

org.redisson.Redisson#getLock()

@Override
public RLock getLock(String name) {
  return new RedissonLock(commandExecutor, name, id);
}

這裡的 RLock 是繼承自 java.util.concurrent.locks.Lock 的一個 interface,getLock 返回的實際上是其實現類 RedissonLock 的例項。

來看看構造 RedissonLock 的引數

  • commandExecutor: 與 Redis 節點通訊併發送指令的真正實現。需要說明一下,Redisson 預設的 CommandExecutor 實現是通過 eval 命令來執行 Lua 指令碼,所以要求 Redis 的版本必須為 2.6 或以上,否則你可能要自己來實現 CommandExecutor。關於 Redisson 的 CommandExecutor 以後會專門解讀,所以本次就不多說了。
  • name: 鎖的全域性名稱,例如上面程式碼中的 "foobar",具體業務中通常可能使用共享資源的唯一標識作為該名稱。
  • id: Redisson 客戶端唯一標識,實際上就是一個 UUID.randomUUID()

org.redisson.RedissonLock#lock()

此處略過前面幾個方法的層層呼叫,直接看最核心部分的方法 lockInterruptibly(),該方法在 RLock 中宣告,支援對獲取鎖的執行緒進行中斷操作。在直接使用 lock() 方法獲取鎖時,最後實際執行的是 lockInterruptibly(-1, null)

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 1.嘗試獲取鎖
    Long ttl = tryAcquire(leaseTime, unit);
    // 2.獲得鎖成功
    if (ttl == null) {
        return;
    }
    // 3.等待鎖釋放,並訂閱鎖
    long threadId = Thread.currentThread().getId();
    Future<RedissonLockEntry> future = subscribe(threadId);
    get(future);

    try {
        while (true) {
            // 4.重試獲取鎖
            ttl = tryAcquire(leaseTime, unit);
            // 5.成功獲得鎖
            if (ttl == null) {
                break;
            }
            // 6.等待鎖釋放
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        // 7.取消訂閱
        unsubscribe(future, threadId);
    }
}
  1. 首先嚐試獲取鎖,具體程式碼下面再看,返回結果是已存在的鎖的剩餘存活時間,為 null 則說明沒有已存在的鎖併成功獲得鎖。
  2. 如果獲得鎖則結束流程,回去執行業務邏輯。
  3. 如果沒有獲得鎖,則需等待鎖被釋放,並通過 Redis 的 channel 訂閱鎖釋放的訊息,這裡的具體實現本文也不深入,只是簡單提一下 Redisson 在執行 Redis 命令時提供了同步非同步的兩種實現,但實際上同步的實現都是基於非同步的,具體做法是使用 Netty 中的非同步工具 FutureFutureListener 結合 JDK 中的 CountDownLatch 一起實現。
  4. 訂閱鎖的釋放訊息成功後,進入一個不斷重試獲取鎖的迴圈,迴圈中每次都先試著獲取鎖,並得到已存在的鎖的剩餘存活時間。
  5. 如果在重試中拿到了鎖,則結束迴圈,跳過第 6 步。
  6. 如果鎖當前是被佔用的,那麼等待釋放鎖的訊息,具體實現使用了 JDK 併發的訊號量工具 Semaphore 來阻塞執行緒,當鎖釋放併發布釋放鎖的訊息後,訊號量的 release() 方法會被呼叫,此時被訊號量阻塞的等待佇列中的一個執行緒就可以繼續嘗試獲取鎖了。
  7. 在成功獲得鎖後,就沒必要繼續訂閱鎖的釋放訊息了,因此要取消對 Redis 上相應 channel 的訂閱。

下面著重看看 tryAcquire() 方法的實現,

private Long tryAcquire(long leaseTime, TimeUnit unit) {
    // 1.將非同步執行的結果以同步的形式返回
    return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId()));
}

private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 2.用預設的鎖超時時間去獲取鎖
    Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
                TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // 成功獲得鎖
            if (ttlRemaining == null) {
                // 3.鎖過期時間重新整理任務排程
                scheduleExpirationRenewal();
            }
        }
    });
    return ttlRemainingFuture;
}

<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId,
                RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 3.使用 EVAL 命令執行 Lua 指令碼獲取鎖
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime,
                        getLockName(threadId));
}
  1. 上面說過 Redisson 實現的執行 Redis 命令都是非同步的,但是它在非同步的基礎上提供了以同步的方式獲得執行結果的封裝
  2. 前面提到分散式鎖要確保未來的一段時間內鎖一定能夠被釋放,因此要對鎖設定超時釋放的時間,在我們沒有指定該時間的情況下,Redisson 預設指定為30秒。
  3. 在成功獲取到鎖的情況下,為了避免業務中對共享資源的操作還未完成,鎖就被釋放掉了,需要定期(鎖失效時間的三分之一)重新整理鎖失效的時間,這裡 Redisson 使用了 Netty 的 TimerTaskTimeout 工具來實現該任務排程。
  4. 獲取鎖真正執行的命令,Redisson 使用 EVAL 命令執行上面的 Lua 指令碼來完成獲取鎖的操作:
  5. 如果通過 exists 命令發現當前 key 不存在,即鎖沒被佔用,則執行 hset 寫入 Hash 型別資料 key:全域性鎖名稱(例如共享資源ID), field:鎖例項名稱(Redisson客戶端ID:執行緒ID), value:1,並執行 pexpire 對該 key 設定失效時間,返回空值 nil,至此獲取鎖成功。
  6. 如果通過 hexists 命令發現 Redis 中已經存在當前 key 和 field 的 Hash 資料,說明當前執行緒之前已經獲取到鎖,因為這裡的鎖是可重入的,則執行 hincrby 對當前 key field 的值加一,並重新設定失效時間,返回空值,至此重入獲取鎖成功。
  7. 最後是鎖已被佔用的情況,即當前 key 已經存在,但是 Hash 中的 Field 與當前值不同,則執行 pttl 獲取鎖的剩餘存活時間並返回,至此獲取鎖失敗。

以上就是對 lock() 的解讀,不過在實際業務中我們可能還會經常使用 tryLock(),雖然兩者有一定差別,但核心部分的實現都是相同的,另外還有其他一些方法可以支援更多自定義引數,本文中就不一一詳述了。

org.redisson.RedissonLock#unlock()

最後來看鎖的釋放,

@Override
public void unlock() {
    // 1.通過 EVAL 和 Lua 指令碼執行 Redis 命令釋放鎖
    Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
                    RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    Arrays.<Object>asList(getName(), getChannelName()), 
                            LockPubSub.unlockMessage, internalLockLeaseTime, 
                            getLockName(Thread.currentThread().getId()));
    // 2.非鎖的持有者釋放鎖時丟擲異常
    if (opStatus == null) {
        throw new IllegalMonitorStateException(
                "attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + Thread.currentThread().getId());
    }
    // 3.釋放鎖後取消重新整理鎖失效時間的排程任務
    if (opStatus) {
        cancelExpirationRenewal();
    }
}
  1. 使用 EVAL 命令執行 Lua 指令碼來釋放鎖:
  2. key 不存在,說明鎖已釋放,直接執行 publish 命令釋出釋放鎖訊息並返回 1
  3. key 存在,但是 field 在 Hash 中不存在,說明自己不是鎖持有者,無權釋放鎖,返回 nil
  4. 因為鎖可重入,所以釋放鎖時不能把所有已獲取的鎖全都釋放掉,一次只能釋放一把鎖,因此執行 hincrby 對鎖的值減一
  5. 釋放一把鎖後,如果還有剩餘的鎖,則重新整理鎖的失效時間並返回 0;如果剛才釋放的已經是最後一把鎖,則執行 del 命令刪除鎖的 key,併發布鎖釋放訊息,返回 1
  6. 上面執行結果返回 nil 的情況(即第2中情況),因為自己不是鎖的持有者,不允許釋放別人的鎖,故丟擲異常。
  7. 執行結果返回 1 的情況,該鎖的所有例項都已全部釋放,所以不需要再重新整理鎖的失效時間。

總結

寫了這麼多,其實最主要的就是上面的兩段 Lua 指令碼,基於 Redis 的分散式鎖的設計完全體現在其中,看完這兩段指令碼,再回顧一下前面的 設計分散式鎖要注意的問題 就豁然開朗了。