1. 程式人生 > >分布式鎖實現大型連續劇之(一):Redis

分布式鎖實現大型連續劇之(一):Redis

set 但是 sss channel 時有 commands 阻塞 iss cond

前言:

單機環境下我們可以通過JAVA的Synchronized和Lock來實現進程內部的鎖,但是隨著分布式應用和集群環境的出現,系統資源的競爭從單進程多線程的競爭變成了多進程的競爭,這時候就需要分布式鎖來保證。
實現分布式鎖現在主流的方式大致有以下三種

  1. 基於數據庫的索引和行鎖
  2. 基於Redis的單線程原子操作:setNX
  3. 基於Zookeeper的臨時有序節點

這篇文章我們用Redis來實現,會基於現有的各種鎖實現來分析,最後分享Redission的鎖源碼分析來看下分布式鎖的開源實現

設計實現

加鎖

一、 通過setNx和getSet來實現

這是現在網上大部分版本的實現方式,筆者之前項目裏面用到分布式鎖也是通過這樣的方式實現

public boolean lock(Jedis jedis, String lockName, Integer expire) {

 //返回是否設置成功
 //setNx加鎖
 long now = System.currentTimeMillis();
 boolean result = jedis.setnx(lockName, String.valueOf(now + expire * 1000)) == 1;

 if (!result) {
     //防止死鎖的容錯
     String timestamp = jedis.get(lockName);
     if (timestamp != null && Long.parseLong(timestamp) < now) {
         //不通過del方法來刪除鎖。而是通過同步的getSet
         String oldValue = jedis.getSet(lockName, String.valueOf(now + expire));
         if (oldValue != null && oldValue.equals(timestamp)) {
             result = true;
             jedis.expire(lockName, expire);
         }
     }
 }
 if (result) {
     jedis.expire(lockName, expire);
 }
 return result;

}

代碼分析:

通過setNx命令老保證操作的原子性,獲取到鎖,並且把過期時間設置到value裏面

通過expire方法設置過期時間,如果設置過期時間失敗的話,再通過value的時間戳來和當前時間戳比較,防止出現死鎖

通過getSet命令在發現鎖過期未被釋放的情況下,避免刪除了在這個過程中有可能被其余的線程獲取到了鎖

存在問題

防止死鎖的解決方案是通過系統當前時間決定的,不過線上服務器系統時間一般來說都是一致的,這個不算是嚴重的問題
鎖過期的時候可能會有多個線程執行getSet命令,在競爭的情況下,會修改value的時間戳,理論上來說會有誤差
鎖無法具備客戶端標識,在解鎖的時候可能被其余的客戶端刪除同一個key

雖然有小問題,不過大體上來說這種分布式鎖的實現方案基本上是符合要求的,能夠做到鎖的互斥和避免死鎖

二、 通過Redis高版本的原子命令

jedis的set命令可以自帶復雜參數,通過這些參數可以實現原子的分布式鎖命令

jedis.set(lockName, "", "NX", "PX", expireTime);
復制代碼代碼分析

redis的set命令可以攜帶復雜參數,第一個是鎖的key,第二個是value,可以存放獲取鎖的客戶端ID,通過這個校驗是否當前客戶端獲取到了鎖,第三個參數取值NX/XX,第四個參數 EX|PX,第五個就是時間

NX:如果不存在就設置這個key XX:如果存在就設置這個key

EX:單位為秒,PX:單位為毫秒

這個命令實質上就是把我們之前的setNx和expire命令合並成一個原子操作命令,不需要我們考慮set失敗或者expire失敗的情況

解鎖

一、 通過Redis的del命令
public boolean unlock(Jedis jedis, String lockName) {
jedis.del(lockName);
return true;
}
代碼分析
通過redis的del命令可以直接刪除鎖,可能會出現誤刪其他線程已經存在的鎖的情況
二、 Redis的del檢查
public static void unlock2(Jedis jedis, String lockKey, String requestId) {

// 判斷加鎖與解鎖是不是同一個客戶端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此時,這把鎖突然不是這個客戶端的,則會誤解鎖
jedis.del(lockKey);
}

}
代碼分析
新增了requestId客戶端ID的判斷,但由於不是原子操作,在多個進程下面的並發競爭情況下,無法保證安全
三、 Redis的LUA腳本
public static boolean unlock3(Jedis jedis, String lockKey, String requestId) {

  String script = "if redis.call(‘get‘, KEYS[1]) == ARGV[1] then return redis.call(‘del‘, KEYS[1]) else return 0 end";
  Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(""));

  if (1L == (long) result) {
      return true;
  }
  return false;

}
代碼分析
通過Lua腳本來保證操作的原子性,其實就是把之前的先判斷再刪除合並成一個原子性的腳本命令,邏輯就是,先通過get判斷value是不是相等,若相等就刪除,否則就直接return

Redission的分布式鎖

Redission是redis官網推薦的一個redis客戶端,除了基於redis的基礎的CURD命令以外,重要的是就是Redission提供了方便好用的分布式鎖API
一、 基本用法
RedissonClient redissonClient = RedissonTool.getInstance();

    RLock distribute_lock = redissonClient.getLock("distribute_lock");

    try {
        boolean result = distribute_lock.tryLock(3, 10, TimeUnit.SECONDS);

    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        if (distribute_lock.isLocked()) {
            distribute_lock.unlock();
        }
    }

代碼流程

通過redissonClient獲取RLock實例
tryLock獲取嘗試獲取鎖,第一個是等待時間,第二個是鎖的超時時間,第三個是時間單位
執行完業務邏輯後,最終釋放鎖

二、 具體實現
我們通過tryLock來分析redission分布式的實現,lock方法跟tryLock差不多,只不過沒有最長等待時間的設置,會自旋循環等待鎖的釋放,直到獲取鎖為止
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
//獲取當前線程ID,用於實現可重入鎖
final long threadId = Thread.currentThread().getId();
//嘗試獲取鎖
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}

    time -= (System.currentTimeMillis() - current);
    if (time <= 0) {
        //等待時間結束,返回獲取失敗
        acquireFailed(threadId);
        return false;
    }

    current = System.currentTimeMillis();
    //訂閱鎖的隊列,等待鎖被其余線程釋放後通知
    final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                @Override
                public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                    if (subscribeFuture.isSuccess()) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }

    try {
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }

            // waiting for message,等待訂閱的隊列消息
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    } finally {
        unsubscribe(subscribeFuture, threadId);
    }

代碼分析
首先tryAcquire嘗試獲取鎖,若返回ttl為null,說明獲取到鎖了

判斷等待時間是否過期,如果過期,直接返回獲取鎖失敗

通過Redis的Channel訂閱監聽隊列,subscribe內部通過信號量semaphore,再通過await方法阻塞,內部其實是用CountDownLatch來實現阻塞,獲取subscribe異步執行的結果,來保證訂閱成功,再判斷是否到了等待時間

再次嘗試申請鎖和等待時間的判斷,循環阻塞在這裏等待鎖釋放的消息RedissonLockEntry也維護了一個semaphore的信號量

無論是否釋放鎖,最終都要取消訂閱這個隊列消息

redission內部的getEntryName是客戶端實例ID+鎖名稱來保證多個實例下的鎖可重入

tryAcquire獲取鎖

redisssion獲取鎖的核心代碼,內部其實是異步調用,但是用get方法阻塞了

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {@Override
br/>@Override
if (!future.isSuccess()) {
return;
}

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryLockInnerAsync方法內部是基於Lua腳本來獲取鎖的
先判斷KEYS[1](鎖名稱)對應的key是否存在,不存在獲取到鎖,hset設置key的value,pexpire設置過期時間,返回null表示獲取到鎖
存在的話,鎖被占,hexists判斷是否是當前線程的鎖,若是的話,hincrby增加重入次數,重新設置過期時間,不是當前線程的鎖,返回當前鎖的過期時間
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call(‘exists‘, KEYS[1]) == 0) then " +
                  "redis.call(‘hset‘, KEYS[1], ARGV[2], 1); " +
                  "redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call(‘hexists‘, KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call(‘hincrby‘, KEYS[1], ARGV[2], 1); " +
                  "redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call(‘pttl‘, KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

Redission避免死鎖的解決方案:
Redission為了避免鎖未被釋放,采用了一個特殊的解決方案,若未設置過期時間的話,redission默認的過期時間是30s,同時未避免鎖在業務未處理完成之前被提前釋放,Redisson在獲取到鎖且默認過期時間的時候,會在當前客戶端內部啟動一個定時任務,每隔internalLockLeaseTime/3的時間去刷新key的過期時間,這樣既避免了鎖提前釋放,同時如果客戶端宕機的話,這個鎖最多存活30s的時間就會自動釋放(刷新過期時間的定時任務進程也宕機)

        // lock acquired,獲取到鎖的時候設置定期更新時間的任務
            if (ttlRemaining) {
                scheduleExpirationRenewal(threadId);
            }

            //expirationRenewalMap的並發安全MAP記錄設置過的緩存,避免並發情況下重復設置任務,internalLockLeaseTime / 3的時間後重新設置過期時間
               private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {

            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call(‘hexists‘, KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call(‘pexpire‘, KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",
                      Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can‘t update lock " + getName() + " expiration", future.cause());
                        return;
                    }

                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}

unlock解鎖

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call(‘exists‘, KEYS[1]) == 0) then " +
"redis.call(‘publish‘, KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call(‘hexists‘, KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call(‘hincrby‘, KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call(‘pexpire‘, KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call(‘del‘, KEYS[1]); " +
"redis.call(‘publish‘, KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

Redission的unlock解鎖也是基於Lua腳本實現的,內部邏輯是先判斷鎖是否存在,不存在說明已經被釋放了,發布鎖釋放消息後返回,鎖存在再判斷當前線程是否鎖擁有者,不是的話,無權釋放返回,解鎖的話,會減去重入的次數,重新更新過期時間,若重入數撿完,刪除當前key,發布鎖釋放消息

總結:

主要基於Redis來設計和實現分布式鎖,通過常用的設計思路引申到Redission的實現,無論是設計思路還是代碼健壯性Redission的設計都是優秀的,值得學習,下一步會講解關於Zookeeper的分布式鎖實現和相關開源源碼分析。

分布式鎖實現大型連續劇之(一):Redis