dubbo 常用的基於redis的分散式鎖實現
阿新 • • 發佈:2018-11-27
小弟本著先會用在學習原理的原則 先用了dubbo 現在在實際業務中 因為分散式專案做了叢集,需要用的分散式鎖,就用到了基於redis的分散式鎖,廢話不多說,先來程式碼:
package com.tiancaibao.utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Redis distributed lock implementation. * * @author qingzhipeng */ public class RedisLock { private static Logger logger = LoggerFactory.getLogger(RedisLock.class); private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100; /** * Lock key path. */ private String lockKey; /** * 鎖超時時間,防止執行緒在入鎖以後,無限的執行等待 */ private int expireMsecs = 60 * 1000; /** * 鎖等待時間,防止執行緒飢餓 */ private int timeoutMsecs = 10 * 1000; private volatile boolean locked = false; /** * Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs. * * @param lockKey lock key (ex. account:1, ...) */ public RedisLock(String lockKey) { this.lockKey = lockKey + "_lock"; } /** * Detailed constructor with default lock expiration of 60000 msecs. * */ public RedisLock(String lockKey, int timeoutMsecs) { this(lockKey); this.timeoutMsecs = timeoutMsecs; } /** * Detailed constructor. * */ public RedisLock(String lockKey, int timeoutMsecs, int expireMsecs) { this(lockKey, timeoutMsecs); this.expireMsecs = expireMsecs; } /** * @return lock key */ public String getLockKey() { return lockKey; } private String get(final String key) { Object obj = null; try { obj=JedisClusterUtil.clusterGetKey(key); } catch (Exception e) { logger.error("get redis error, key : {}", key); } return obj != null ? obj.toString() : null; } private boolean setNX(final String key, final String value) { Long result = 0l; try { result=JedisClusterUtil.clusterSetNxKey(key, value); } catch (Exception e) { logger.error("setNX redis error, key : {}", key); } return result== 1l ? true : false; } private String getSet(final String key, final String value) { String result = null; try { result=JedisClusterUtil.clusterGetSetKey(key, value); } catch (Exception e) { logger.error("setNX redis error, key : {}", key); } return result; } /** * 獲得 lock. * 實現思路: 主要是使用了redis 的setnx命令,快取了鎖. * reids快取的key是鎖的key,所有的共享, value是鎖的到期時間(注意:這裡把過期時間放在value了,沒有時間上設定其超時時間) * 執行過程: * 1.通過setnx嘗試設定某個key的值,成功(當前沒有這個鎖)則返回,成功獲得鎖 * 2.鎖已經存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設定新的值 * * @return true if lock is acquired, false acquire timeouted * @throws InterruptedException in case of thread interruption */ public synchronized boolean lock() throws InterruptedException { int timeout = timeoutMsecs; while (timeout >= 0) { long expires = System.currentTimeMillis() + expireMsecs + 1; String expiresStr = String.valueOf(expires); //鎖到期時間 if (this.setNX(lockKey, expiresStr)) { // lock acquired locked = true; return true; } String currentValueStr = this.get(lockKey); //redis裡的時間 if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { //判斷是否為空,不為空的情況下,如果被其他執行緒設定了值,則第二個條件判斷是過不去的 // lock is expired String oldValueStr = this.getSet(lockKey, expiresStr); //獲取上一個鎖到期時間,並設定現在的鎖到期時間, //只有一個執行緒才能獲取上一個線上的設定時間,因為jedis.getSet是同步的 if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { //防止誤刪(覆蓋,因為key是相同的)了他人的鎖——這裡達不到效果,這裡值會被覆蓋,但是因為什麼相差了很少的時間,所以可以接受 //[分散式的情況下]:如過這個時候,多個執行緒恰好都到了這裡,但是隻有一個執行緒的設定值和當前值相同,他才有權利獲取鎖 // lock acquired locked = true; return true; } } timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS; /* 延遲100 毫秒, 這裡使用隨機時間可能會好一點,可以防止飢餓程序的出現,即,當同時到達多個程序, 只會有一個程序獲得鎖,其他的都用同樣的頻率進行嘗試,後面有來了一些進行,也以同樣的頻率申請鎖,這將可能導致前面來的鎖得不到滿足. 使用隨機的等待時間可以一定程度上保證公平性 */ Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS); } return false; } /** * Acqurired lock release. */ public synchronized void unlock() { if (locked) { JedisClusterUtil.clusterDelKey(lockKey); locked = false; } } }
小弟請教了群裡的大牛,發現這種做法很常見。但一些細節性的問題還是要好好琢磨一下,他是怎麼實現分散式鎖的呢?
簡單說 就是講鎖的型別 與 超時時間組合成key-value模式 存放setnx 到redis中。
當多臺伺服器 的同一個介面產生併發時,業務正常的情況下:
c0設定了鎖
c1
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis())
執行緒無法通過這個判斷。再來個c2也無法進入。所以正常業務 就是ok的。
當c0執行緒因為服務宕機或者業務流程過長導致超時呢? 沒有釋放鎖的時候呢。
上面的if判斷就不能阻擋了,但
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
可以阻擋,當c1和c2併發進入時:
C1使用getSet方法
C2也執行了getSet方法,能保證C1和C2只能一個能獲得鎖(c1和c2都將鎖的時間賦值總有且一個獲取時與currentValueStr相等),一個只能繼續等待。,這樣就保證了業務異常時所得釋放。
注意:這裡可能導致超時時間不是其原本的超時時間,C1的超時時間可能被C2覆蓋了,但是他們相差的毫秒及其小,這裡忽略了。
但是:釋放鎖 還需要一些注意的地方。那就是判斷一下是否超時
//為了讓分散式鎖的演算法更穩鍵些,持有鎖的客戶端在解鎖之前應該再檢查一次自己的鎖是否已經超時,再去做DEL操作,因為可能客戶端因為某個耗時的操作而掛起, //操作完的時候鎖因為超時已經被別人獲得,這時就不必解鎖了。
RedisLock redisLock = new RedisLock("userInitialFix");
try {
if (redisLock.lock()) {// 獲取鎖,如果成功進行查詢資料庫匹配債權
selectMaxMoneyByAnyThing = debtOriginalAssetBillsService.selectMaxMoneyByAnyThing(days[i],
"OLD_PRODUCT", amount);
DebtOriginalAssetBillsWithBLOBs new_DebtOriginalAsset = new DebtOriginalAssetBillsWithBLOBs();
if (selectMaxMoneyByAnyThing != null) {
new_DebtOriginalAsset.setId(selectMaxMoneyByAnyThing.getId());
new_DebtOriginalAsset.setRemainAmount(selectMaxMoneyByAnyThing.getRemainAmount() - amount);
new_DebtOriginalAsset.setArrivalAmount(selectMaxMoneyByAnyThing.getArrivalAmount() + amount);
debtOriginalAssetBillsService.updateSelectiveById(new_DebtOriginalAsset);// 更新債權表
break;
}
} else {
// 等一秒繼續進行匹配防止無謂迴圈
Thread.sleep(1000);
// 繼續去呼叫
return matchDebtOriginalAsset(day, amount);
}
} catch (Exception e) {
System.out.println("使用者初始化定期金額出錯!!");
e.printStackTrace();
} finally {
redisLock.unlock();// 釋放鎖
}
上述程式碼 就差了一個超時的處理。
參考https://www.cnblogs.com/0201zcr/p/5942748.html