1. 程式人生 > >高併發的業務場景如何做到資料一致性的。

高併發的業務場景如何做到資料一致性的。

一、場景:

1、有資料表:ConCurrency,

CREATE TABLE [dbo].[ConCurrency](
    [ID] [int] NOT NULL,
     [Total] [int] NULL
 )

2、初始值:ID=1,Total = 0

3、現要求每一次客戶端請求Total + 1

二、單執行緒

         static void Main(string[] args)
         {
            ...
            new Thread(Run).Start();
             ...
        }
 
        public static void Run()
         {
             for (int i = 1; i <= 100; i++)
             {
                     var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
                     var value = int.Parse(total) + 1;

                    DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
                     Thread.Sleep(1);
            }
         }

2.1 按要求,正常情況下應該輸出:100

2.2 執行結果

貌似沒有問題。

三、多執行緒併發

3.1 Main改一下

         static void Main(string[] args)
         {
             ...
            new Thread(Run).Start();
            new Thread(Run).Start();
             ...
         }

3.2 我們預期應該是要輸出200

3.3 執行結果

很遺憾,卻是150,造成這個結果的原因是這樣的:T1、T2獲取Total(假設此時值為10),T1更新一次或多次後,T2才更新(Total:10)

這就造成之前T1提交的被覆蓋了

3.4 如何避免呢?一般做法加鎖就可以了,如Run改成如下

         public static void Run()
         {
             for (int i = 1; i <= 100; i++)
            {
                 lock (resource)
                 {
                     var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
                     var value = int.Parse(total) + 1;
 
                     DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
                 }
 
                 Thread.Sleep(1);
            }
         }

3.5 再次執行

四、用佇列實現

4.1、定義佇列

static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
         /// <summary>生產者</summary>
        public static void Produce()
         {
             for (int i = 1; i <= 100; i++)
             {
                 queue.Enqueue(i);
             }
         }
 
         /// <summary>消費者</summary>
         public static void Consume()
        {
             int times;
             while (queue.TryDequeue(out times))
             {
                 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
                var value = int.Parse(total) + 1;
 
                 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
                 Thread.Sleep(1);
             }
         }

4.2 Main改一下

         static void Main(string[] args)
         {
             ...
            new Thread(Produce).Start();
             new Thread(Produce).Start();
            Consume();
             ...
         }

4.3 預期輸出200,看執行結果

4.4 叢集環境下測試,2臺機器

有問題!最後執行的那臺機器居然是379,資料庫也是379。

這超出了我們的預期結果,看來即便加鎖,對於高併發場景也是不能解決所有問題的

五、分散式佇列 

5.1 解決上邊問題可以用分散式佇列,這裡用的是redis佇列

         /// <summary>生產者</summary>
         public static void ProduceToRedis()
         {
             using (var client = RedisManager.GetClient())
             {
                 for (int i = 1; i <= 100; i++)
                {
                     client.EnqueueItemOnList("EnqueueName", i.ToString());
                 }
            }
         }
 
         /// <summary>消費者</summary>
         public static void ConsumeFromRedis()
         {
            using (var client = RedisManager.GetClient())
             {
                 while (client.GetListCount("EnqueueName") > 0)
                 {
                    if (client.SetValueIfNotExists("lock", "lock"))
                     {
                         var item = client.DequeueItemFromList("EnqueueName");
                         var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
                         var value = int.Parse(total) + 1;
 
                         DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
 
                         client.Remove("lock");
                     }

                     Thread.Sleep(5);
                 }
             }
         }

5.2 Main也要改改

         static void Main(string[] args)
         {
            ...
             new Thread(ProduceToRedis).Start();
             new Thread(ProduceToRedis).Start();
             Thread.Sleep(1000 * 10);
 
             ConsumeFromRedis();
             ...
         }

5.3 在叢集裡再試試,2個都是400,沒有錯(因為每個站點開了2個執行緒)

可以看到資料完全正確!