.net core 下的分散式事務鎖
阿新 • • 發佈:2018-11-16
目錄
系統分散式鎖的用法
公司框架新增功能分散式鎖:
鎖的效能之王: 快取 > Zookeeper >= 資料庫
鎖的實現
實現原理:核心採用StackExchange.Redis的LockTake方法實現。
支援同步獲取鎖,或者等待直到超時獲取鎖。
/// <summary>
/// 分散式鎖,提供全域性分散式鎖支援,以resource redis為基礎
/// 這個鎖只能通過RpcContext來獲取,通過自己手動釋放
/// </summary>
public sealed class DistributedLock
{
private static readonly TimeSpan DefaultAbandonmentCheckFrequency = TimeSpan.FromSeconds(2);
public readonly string lockName;
private readonly string lockValue;
private readonly int checkTimeSpan = 50; //ms
private readonly int autoDelete;
private DistributedLock()
{
}
/// <summary>
///
/// </summary>
/// <param name="lockName"></param>
/// <param name="autoDelete">自動刪除,ms,預設 60s</param>
/// <param name="checkTimeSpan">如果不能獲取鎖,重複檢查間隔:預設 50ms</param>
internal DistributedLock(string lockName, int autoDelete = 60000,int checkTimeSpan = 50)
{
// note that just Global\ is not a valid name
if (string.IsNullOrEmpty(lockName))
throw new ArgumentNullException("lockName不能為空");
if (null == ResourceCache.Instance)
throw new Exception(@"ResourceCache 沒有配置或無法連線");
this.checkTimeSpan = Math.Max(checkTimeSpan,1);
this.autoDelete = Math.Max(autoDelete,1);
this.lockName = lockName;
this.lockValue = lockName;
}
/// <summary>
/// 獲取鎖
/// </summary>
/// <param name="timeout">超時為null,則嘗試一次即返回</param>
/// <returns>獲取鎖成功?</returns>
internal bool Acquire(TimeSpan? timeout = null)
{
bool bLock = false;
var dtStart = DateTime.Now.Ticks;
while (!bLock)
{
bLock = TryAcquireOnce();
if (timeout == null)
{
break;
}
if (!bLock)
{
Thread.Sleep(this.checkTimeSpan);
}
var ts = new TimeSpan(DateTime.Now.Ticks - dtStart);
if (ts >= timeout)
{
break;
}
}
return bLock;
}
//此處採用框架上下文管理分散式事務鎖的釋放,程式碼略。
//public void Dispose()
//{
// LockManager.ReleaseLock(this);
//}
internal void Release()
{
try
{
var bRtn = ResourceCache.Instance.LockRelease(this.lockName, this.lockValue);
Trace.WriteLine($"釋放鎖 {this.lockName}:{bRtn}");
}
catch (Exception e)
{
LogTextWriter.Write($"釋放鎖失敗,系統自動超時釋放:{this.lockName}");
}
}
/// <summary>
/// 釋放鎖
/// </summary>
public void ReleaseLock()
{
LockManager.ReleaseLock(this);
}
private bool TryAcquireOnce()
{
try
{
Trace.WriteLine($"{Thread.CurrentThread.ManagedThreadId}:TryAcquireOnce");
var @lock = ResourceCache.Instance.LockTake(this.lockName, this.lockValue, new TimeSpan(0, 0, 0, 0, this.autoDelete));
return @lock;
}
catch (Exception e)
{
return false;
}
}
}
鎖的使用
在當前上下文中獲取一個分散式鎖,第一個獲取鎖的將執行依賴當前key(一般為業務主鍵)的完整業務流程(包括多個微服務之間的呼叫和資料庫的訪問;
後來者將無法獲取鎖,根據返回的結果來判斷是否進入流程,如果返回的鎖為null將不能執行下面的流程,要麼重試等待鎖釋放,要麼返回錯誤.
鎖的呼叫一般流程:
var qtLock=TryGetLock(lockKey);
if(qtLock==null)
{
//提示不能同時執行操作
return;
}
else
{
//進行業務流程
}
//最後別忘了
qtLock.ReleaseLock();
API內的範例:
code = StatusCode.OK;
//傳入超時時間,可以一直等待到超時過期
var lockSaveReceipt = this.Context.TryGetLock($"{nameof(SaveReceipt)}.{valueArgs.ReceiptArgs.ReceiptId}");
if (lockSaveReceipt == null)
{
code = PublicErrorCode.SaveReceiptByUsed.ToCode();
return null;
}
try{
//todo 業務操作1
//todo 業務操作2
//...
}
finally
{
lockSaveReceipt.ReleaseLock();
}