1. 程式人生 > >RedisTemplate實現分散式鎖

RedisTemplate實現分散式鎖

使用Redis的SETNX命令獲取分散式鎖的步驟:

  • C1和C2執行緒同時檢查時間戳獲取鎖,執行SETNX命令並都返回0,此時鎖仍被C3持有,並且C3已經崩潰
  • C1 DEL
  • C1 使用SETNX命令獲取鎖,並且成功
  • C2 DEL
  • C2 使用SETNX命令獲取鎖,並且成功
  • ERROR : 由於競態條件,C1和C2都獲取到了鎖

幸運的是,以下面的步驟完全可以避免這種情況發生,看看C4執行緒如何操作

  • C4使用SETNX命令獲取鎖
  • C3已經崩潰但是仍然持有鎖,所以Redis返回0給C4
  • C4使用GET命令獲取鎖並檢查鎖是否已經過期,如果沒有過期,則繼續等待一段時間並重新重試
  • 如果鎖已經過期,C4嘗試 GETSET lock.foo <current Unix timestamp + lock timeout + 1>
  • 利用GETSET語法,C4可以檢查舊時間是否仍然是過期時間,如果是,則獲取鎖
  • 如果另一個客戶端C5率先獲取到鎖,C4執行GETSET命令後將返回非過期時間,然後C4繼續從頭開始重新嘗試獲取鎖。此操作C4將延長一點C5獲取到的鎖的過期時間,不過這不是什麼大問題。


接下來我們用程式碼的形式展現:

package com.shuige.components.cache.redis;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * Description: 通用Redis幫助類
 * User: zhouzhou
 * Date: 2018-09-05
 * Time: 15:39
 */
@Component
public class CommonRedisHelper {

    public static final String LOCK_PREFIX = "redis_lock";
    public static final int LOCK_EXPIRE = 300; // ms

    @Autowired
    RedisTemplate redisTemplate;

   
    /**
     *  最終加強分散式鎖
     *
     * @param key key值
     * @return 是否獲取到
     */
    public boolean lock(String key){
        String lock = LOCK_PREFIX + key;
        // 利用lambda表示式
        return (Boolean) redisTemplate.execute((RedisCallback) connection -> {

            long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1;
            Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(expireAt).getBytes());


            if (acquire) {
                return true;
            } else {

                byte[] value = connection.get(lock.getBytes());

                if (Objects.nonNull(value) && value.length > 0) {

                    long expireTime = Long.parseLong(new String(value));

                    if (expireTime < System.currentTimeMillis()) {
                        // 如果鎖已經過期
                        byte[] oldValue = connection.getSet(lock.getBytes(), String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes());
                        // 防止死鎖
                        return Long.parseLong(new String(oldValue)) < System.currentTimeMillis();
                    }
                }
            }
            return false;
        });
    }

    /**
     * 刪除鎖
     *
     * @param key
     */
    public void delete(String key) {
        redisTemplate.delete(key);
    }

}

如何使用呢,匯入工具類後:

 boolean lock = redisHelper.lock(key);
        if (lock) {
            // 執行邏輯操作
            redisHelper.delete(key);
        } else {
            // 設定失敗次數計數器, 當到達5次時, 返回失敗
            int failCount = 1;
            while(failCount <= 5){
                // 等待100ms重試
                try {
                    Thread.sleep(100l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (redisHelper.lock(key)){
                   // 執行邏輯操作
                    redisHelper.delete(key);
                }else{
                    failCount ++;
                }
            }
            throw new RuntimeException("現在建立的人太多了, 請稍等再試");
        }

加鎖成功執行完邏輯後, 必須解鎖, 否則只能靠鎖機制來解鎖了不建議這麼做