高併發的業務場景如何做到資料一致性的。
阿新 • • 發佈:2019-01-09
一、場景:
1、有資料表:ConCurrency,
CREATE TABLE [dbo].[ConCurrency](
[ID] [int] NOT NULL,
[Total] [int] NULL
)
2、初始值:ID=1,Total = 0
3、現要求每一次客戶端請求Total + 1
二、單執行緒
static void Main(string[] args) { ... new Thread(Run).Start(); ... } public static void Run() { for (int i = 1; i <= 100; i++) { var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); var value = int.Parse(total) + 1; DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); Thread.Sleep(1); } }
2.1 按要求,正常情況下應該輸出:100
2.2 執行結果
貌似沒有問題。
三、多執行緒併發
3.1 Main改一下
static void Main(string[] args)
{
...
new Thread(Run).Start();
new Thread(Run).Start();
...
}
3.2 我們預期應該是要輸出200
3.3 執行結果
很遺憾,卻是150,造成這個結果的原因是這樣的:T1、T2獲取Total(假設此時值為10),T1更新一次或多次後,T2才更新(Total:10)
這就造成之前T1提交的被覆蓋了
3.4 如何避免呢?一般做法加鎖就可以了,如Run改成如下
public static void Run() { for (int i = 1; i <= 100; i++) { lock (resource) { var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); var value = int.Parse(total) + 1; DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); } Thread.Sleep(1); } }
3.5 再次執行
四、用佇列實現
4.1、定義佇列
static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
/// <summary>生產者</summary>
public static void Produce()
{
for (int i = 1; i <= 100; i++)
{
queue.Enqueue(i);
}
}
/// <summary>消費者</summary>
public static void Consume()
{
int times;
while (queue.TryDequeue(out times))
{
var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
var value = int.Parse(total) + 1;
DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
Thread.Sleep(1);
}
}
4.2 Main改一下
static void Main(string[] args)
{
...
new Thread(Produce).Start();
new Thread(Produce).Start();
Consume();
...
}
4.3 預期輸出200,看執行結果
4.4 叢集環境下測試,2臺機器
有問題!最後執行的那臺機器居然是379,資料庫也是379。
這超出了我們的預期結果,看來即便加鎖,對於高併發場景也是不能解決所有問題的
五、分散式佇列
5.1 解決上邊問題可以用分散式佇列,這裡用的是redis佇列
/// <summary>生產者</summary>
public static void ProduceToRedis()
{
using (var client = RedisManager.GetClient())
{
for (int i = 1; i <= 100; i++)
{
client.EnqueueItemOnList("EnqueueName", i.ToString());
}
}
}
/// <summary>消費者</summary>
public static void ConsumeFromRedis()
{
using (var client = RedisManager.GetClient())
{
while (client.GetListCount("EnqueueName") > 0)
{
if (client.SetValueIfNotExists("lock", "lock"))
{
var item = client.DequeueItemFromList("EnqueueName");
var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
var value = int.Parse(total) + 1;
DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
client.Remove("lock");
}
Thread.Sleep(5);
}
}
}
5.2 Main也要改改
static void Main(string[] args)
{
...
new Thread(ProduceToRedis).Start();
new Thread(ProduceToRedis).Start();
Thread.Sleep(1000 * 10);
ConsumeFromRedis();
...
}
5.3 在叢集裡再試試,2個都是400,沒有錯(因為每個站點開了2個執行緒)
可以看到資料完全正確!