1. 程式人生 > >Redis 分散式鎖(二)

Redis 分散式鎖(二)

續上篇 Redis 分散式鎖

超時問題


Redis 的分散式鎖不能解決超時問題,如果在加鎖和釋放鎖之間的邏輯執行的太長,以至於超出了鎖的超時限制,就會出現問題。因為這時候鎖A過期了,第二個執行緒重新持有了這把鎖A,但是緊接著第一個執行緒執行完了業務邏輯,就把鎖A給釋放了,問題是第二個執行緒還沒執行完,鎖A就被第一個釋放了,第三個執行緒就會在第二個執行緒邏輯執行完之之前拿到了鎖。

為了避免這個問題,Redis 分散式鎖不要用於較長時間的任務。如果真的偶爾出現了,資料出現的小波錯亂可能需要人工介入解決。

int tag = random.nextint()  # 隨機數
if redis.set(key, tag, nx=True, ex=5){
    do_something();
    redis.delifequals

(key, tag) ; // 假想的 delete if equals 指令

}

有一個稍微安全一點的方案是為 set 指令的 value 引數設定為一個隨機數,釋放鎖時先匹配隨機數是否一致,然後再刪除 key,這是為了確保當前執行緒佔有的鎖不會被其它執行緒釋放,除非這個鎖是過期了被伺服器自動釋放的。 但是匹配 value 和刪除 key 不是一個原子操作,Redis 也沒有提供類似於delete if equals這樣的指令,這就需要使用 Lua 指令碼來處理了,因為 Lua 指令碼可以保證連續多個指令的原子性執行。

# delifequals
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

但是這也不是一個完美的方案,它只是相對安全一點,因為如果真的超時了,當前執行緒的邏輯沒有執行完,其它執行緒也會乘虛而入。

可重入性

關於可重入這一概念,我們需要參考維基百科。

若一個程式或子程式可以“在任意時刻被中斷然後作業系統排程執行另外一段程式碼,這段程式碼又呼叫了該子程式不會出錯”,則稱其為可重入(reentrant或re-entrant)的。即當該子程式正在執行時,執行執行緒可以再次進入並執行它,仍然獲得符合設計時預期的結果。與多執行緒併發執行的執行緒安全不同,可重入強調對單個執行緒執行時重新進入同一個子程式仍然是安全的。

那麼可重入性也可以理解為執行緒在持有鎖的情況下再次請求加鎖,如果一個鎖支援同一個執行緒的多次加鎖,那麼這個鎖就是可重入的。比如 Java 語言裡有個 ReentrantLock 就是可重入鎖。Redis 分散式鎖如果要支援可重入,需要對客戶端的 set 方法進行包裝,使用執行緒的 Threadlocal 變數儲存當前持有鎖的計數。不推薦使用可重入鎖,它加重了客戶端的複雜性,在編寫業務方法時注意在邏輯結構上進行調整完全可以不使用可重入鎖。下面是 Java 版本的可重入鎖。

 public class RedisWithReentrantLock {

  private ThreadLocal<Map<String, Integer>> lockers = new ThreadLocal<>();

  private Jedis jedis;

  public RedisWithReentrantLock(Jedis jedis) {
    this.jedis = jedis;
  }

  private boolean _lock(String key) {
    return jedis.set(key, "", "nx", "ex", 5L) != null;
  }

  private void _unlock(String key) {
    jedis.del(key);
  }

  private Map<String, Integer> currentLockers() {
    Map<String, Integer> refs = lockers.get();
    if (refs != null) {
      return refs;
    }
    lockers.set(new HashMap<>());
    return lockers.get();
  }

  public boolean lock(String key) {
    Map<String, Integer> refs = currentLockers();
    Integer refCnt = refs.get(key);
    if (refCnt != null) {
      refs.put(key, refCnt + 1);
      return true;
    }
    boolean ok = this._lock(key);
    if (!ok) {
      return false;
    }
    refs.put(key, 1);
    return true;
  }

 

public boolean unlock(String key) {
    Map<String, Integer> refs = currentLockers();
    Integer refCnt = refs.get(key);
    if (refCnt == null) {
      return false;
    }
    refCnt -= 1;
    if (refCnt > 0) {
      refs.put(key, refCnt);
    } else {
      refs.remove(key);
      this._unlock(key);
    }
    return true;
  }

  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    RedisWithReentrantLock redis = new RedisWithReentrantLock(jedis);
    System.out.println(redis.lock("codehole"));
    System.out.println(redis.lock("codehole"));
    System.out.println(redis.unlock("codehole"));
    System.out.println(redis.unlock("codehole"));
  }

}

鎖衝突處理
 

一般有 3 種策略來處理加鎖失敗:

1、直接丟擲異常,通知使用者稍後重試;
2、sleep 一會再重試;
3、將請求轉移至延時佇列,過一會再試;

直接丟擲特定型別的異常;

這種方式比較適合由使用者直接發起的請求,使用者看到錯誤對話方塊後,會先閱讀對話方塊的內容,再點選重試,這樣就可以起到人工延時的效果。如果考慮到使用者體驗,可以由前端的程式碼替代使用者自己來進行延時重試控制。它本質上是對當前請求的放棄,由使用者決定是否重新發起新的請求。

sleep

sleep 會阻塞當前的訊息處理執行緒,會導致佇列的後續訊息處理出現延遲。如果碰撞的比較頻繁或者佇列裡訊息比較多,sleep 可能並不合適。如果因為個別死鎖的 key 導致加鎖不成功,執行緒會徹底堵死,導致後續訊息永遠得不到及時處理。

延時佇列

這種方式比較適合非同步訊息處理,將當前衝突的請求扔到另一個佇列延後處理以避開衝突。 

 下一節講 Redis 訊息延時佇列