1. 程式人生 > >dubbo 常用的基於redis的分散式鎖實現

dubbo 常用的基於redis的分散式鎖實現

 

小弟本著先會用在學習原理的原則 先用了dubbo 現在在實際業務中 因為分散式專案做了叢集,需要用的分散式鎖,就用到了基於redis的分散式鎖,廢話不多說,先來程式碼:

package com.tiancaibao.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Redis distributed lock implementation.
 *	
 * @author qingzhipeng
 */
public class RedisLock {

    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
    
    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;

    /**
     * Lock key path.
     */
    private String lockKey;

    /**
     * 鎖超時時間,防止執行緒在入鎖以後,無限的執行等待
     */
    private int expireMsecs = 60 * 1000;

    /**
     * 鎖等待時間,防止執行緒飢餓
     */
    private int timeoutMsecs = 10 * 1000;

    private volatile boolean locked = false;

    /**
     * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs.
     *
     * @param lockKey lock key (ex. account:1, ...)
     */
    public RedisLock(String lockKey) {
        this.lockKey = lockKey + "_lock";
    }

    /**
     * Detailed constructor with default lock expiration of 60000 msecs.
     *
     */
    public RedisLock(String lockKey, int timeoutMsecs) {
        this(lockKey);
        this.timeoutMsecs = timeoutMsecs;
    }

    /**
     * Detailed constructor.
     *
     */
    public RedisLock(String lockKey, int timeoutMsecs, int expireMsecs) {
        this(lockKey, timeoutMsecs);
        this.expireMsecs = expireMsecs;
    }

    /**
     * @return lock key
     */
    public String getLockKey() {
        return lockKey;
    }

    private String get(final String key) {
        Object obj = null;
        try {
        	obj=JedisClusterUtil.clusterGetKey(key);
        } catch (Exception e) {
            logger.error("get redis error, key : {}", key);
        }
        return obj != null ? obj.toString() : null;
    }

    private boolean setNX(final String key, final String value) {
        Long result = 0l;
        try {
        	result=JedisClusterUtil.clusterSetNxKey(key, value);
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return result== 1l ? true : false;
    }

    private String getSet(final String key, final String value) {
        String result = null;
        try {
        	result=JedisClusterUtil.clusterGetSetKey(key, value);
        } catch (Exception e) {
            logger.error("setNX redis error, key : {}", key);
        }
        return result;
    }

    /**
     * 獲得 lock.
     * 實現思路: 主要是使用了redis 的setnx命令,快取了鎖.
     * reids快取的key是鎖的key,所有的共享, value是鎖的到期時間(注意:這裡把過期時間放在value了,沒有時間上設定其超時時間)
     * 執行過程:
     * 1.通過setnx嘗試設定某個key的值,成功(當前沒有這個鎖)則返回,成功獲得鎖
     * 2.鎖已經存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設定新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //鎖到期時間
            if (this.setNX(lockKey, expiresStr)) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = this.get(lockKey); //redis裡的時間
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判斷是否為空,不為空的情況下,如果被其他執行緒設定了值,則第二個條件判斷是過不去的
                // lock is expired

                String oldValueStr = this.getSet(lockKey, expiresStr);
                //獲取上一個鎖到期時間,並設定現在的鎖到期時間,
                //只有一個執行緒才能獲取上一個線上的設定時間,因為jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止誤刪(覆蓋,因為key是相同的)了他人的鎖——這裡達不到效果,這裡值會被覆蓋,但是因為什麼相差了很少的時間,所以可以接受

                    //[分散式的情況下]:如過這個時候,多個執行緒恰好都到了這裡,但是隻有一個執行緒的設定值和當前值相同,他才有權利獲取鎖
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延遲100 毫秒,  這裡使用隨機時間可能會好一點,可以防止飢餓程序的出現,即,當同時到達多個程序,
                只會有一個程序獲得鎖,其他的都用同樣的頻率進行嘗試,後面有來了一些進行,也以同樣的頻率申請鎖,這將可能導致前面來的鎖得不到滿足.
                使用隨機的等待時間可以一定程度上保證公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);

        }
        return false;
    }


    /**
     * Acqurired lock release.
     */
    public synchronized void unlock() {
        if (locked) {
        	JedisClusterUtil.clusterDelKey(lockKey);
            locked = false;
        }
    }

}

小弟請教了群裡的大牛,發現這種做法很常見。但一些細節性的問題還是要好好琢磨一下,他是怎麼實現分散式鎖的呢?

簡單說 就是講鎖的型別 與 超時時間組合成key-value模式 存放setnx  到redis中。

當多臺伺服器 的同一個介面產生併發時,業務正常的情況下:

c0設定了鎖

c1

if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis())

執行緒無法通過這個判斷。再來個c2也無法進入。所以正常業務  就是ok的。

當c0執行緒因為服務宕機或者業務流程過長導致超時呢? 沒有釋放鎖的時候呢。

上面的if判斷就不能阻擋了,但

if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {

可以阻擋,當c1和c2併發進入時:

C1使用getSet方法

C2也執行了getSet方法,能保證C1和C2只能一個能獲得鎖(c1和c2都將鎖的時間賦值總有且一個獲取時與currentValueStr相等),一個只能繼續等待。,這樣就保證了業務異常時所得釋放。

注意:這裡可能導致超時時間不是其原本的超時時間,C1的超時時間可能被C2覆蓋了,但是他們相差的毫秒及其小,這裡忽略了。

但是:釋放鎖 還需要一些注意的地方。那就是判斷一下是否超時

//為了讓分散式鎖的演算法更穩鍵些,持有鎖的客戶端在解鎖之前應該再檢查一次自己的鎖是否已經超時,再去做DEL操作,因為可能客戶端因為某個耗時的操作而掛起,
            //操作完的時候鎖因為超時已經被別人獲得,這時就不必解鎖了。
RedisLock redisLock = new RedisLock("userInitialFix");
			try {
				if (redisLock.lock()) {// 獲取鎖,如果成功進行查詢資料庫匹配債權
					selectMaxMoneyByAnyThing = debtOriginalAssetBillsService.selectMaxMoneyByAnyThing(days[i],
							"OLD_PRODUCT", amount);
					DebtOriginalAssetBillsWithBLOBs new_DebtOriginalAsset = new DebtOriginalAssetBillsWithBLOBs();
					if (selectMaxMoneyByAnyThing != null) {
						new_DebtOriginalAsset.setId(selectMaxMoneyByAnyThing.getId());

						new_DebtOriginalAsset.setRemainAmount(selectMaxMoneyByAnyThing.getRemainAmount() - amount);
						new_DebtOriginalAsset.setArrivalAmount(selectMaxMoneyByAnyThing.getArrivalAmount() + amount);
						debtOriginalAssetBillsService.updateSelectiveById(new_DebtOriginalAsset);// 更新債權表
						break;
					}
				} else {
					// 等一秒繼續進行匹配防止無謂迴圈
					Thread.sleep(1000);
					// 繼續去呼叫
					return matchDebtOriginalAsset(day, amount);
				}
			} catch (Exception e) {
				System.out.println("使用者初始化定期金額出錯!!");
				e.printStackTrace();
			} finally {
				redisLock.unlock();// 釋放鎖
			}

上述程式碼 就差了一個超時的處理。

參考https://www.cnblogs.com/0201zcr/p/5942748.html