1. 程式人生 > >Redis學習系列七分散式鎖

Redis學習系列七分散式鎖

一、簡介

熟悉.Net多執行緒的都知道,當多個執行緒同時操作一個全域性快取物件(static物件例項、Dictionary、List等)時,會存在多執行緒爭用問題,包括EF、Dapper等本身的快取機制,都存在多執行緒爭用問題,當我們在享受多執行緒帶來的好處的同時,千萬要注意這個問題.如果不瞭解多執行緒,請移步到我的C#多執行緒分類下.但是實際的業務場景中經常存在需要根據每個快取物件的狀態,進行一系列判斷之後,在進行修改的操作,但是這個操作必須保證有序性,不能多個執行緒同時去讀,否則就亂套了.比如你要進行一個數據庫表字段的遞增操作,首先可能時先去把最後一條記錄讀出來,然後拿到對應的欄位,然後更新回資料庫,但是這個時候如果在多執行緒環境下,多個執行緒可能同時去讀,如果用了EF、Dapeer等ORM,它們會把資料讀到快取中,這個時候多個執行緒拿到了相同的資料,然後同步+1操作,那麼這個時候如果有三個執行緒,那麼只會進行一次+1操作,而不是三次.

Redis也是如此,所以這個時候就需要使用Redis的分散式鎖,來限制這個行為,如果你用了他的非同步Api,我前面的隨筆用的都是非同步的.

 

二、分散式鎖實戰

哇,踩坑踩了還久,終於明白了StackExchange.Redis怎麼處理分散式鎖,我剛開始使用Api時,是這麼認為的.當一個執行緒使用Redis的資料時(該資料已加上了分散式鎖),另外一個執行緒則不能操作這個資料,會等待前面的鎖釋放(這個過程Redis幫助我們完成),但是事實證明我太年輕了.Redis還是Redis,即使時分散式鎖,也是一種快取資料(這一點和C#完全不一樣,所以需要注意).為什麼會這樣呢?看下面的程式碼:

    class Program
    {
        static Program()
        {
            //鏈式配置Redis
            AppConfiguration.Current.ConfigureRedis<RedisConfig>();
        }

        static void Main(string[] args)
        {
            StringSetGetAsync();
            Console.ReadKey();
        }

        
static async void StringSetGetAsync() { //需要鎖的Redis資料的鍵 var key = "使用者資訊Id"; //先非同步寫入一個 var kv=new KeyValuePair<RedisKey, RedisValue>(key, 1); await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { kv }); //對使用者的資訊進行加鎖操作 var time = 100000; //請求Id,這裡最好能區分客戶端的呼叫標識,方便異常時定位錯誤 var requestId = "鎖的鍵"; var lockResult = await RedisClient.LockTakeAsync(requestId, key, TimeSpan.FromMilliseconds(time)); if (lockResult) { Console.WriteLine("對使用者資訊加鎖成功,請求的鎖Id為:{0},鎖的時間週期為:{1}毫秒", key, TimeSpan.FromMilliseconds(time)); } else { Console.WriteLine("加鎖失敗,key為{0}的資料結構已被其它請求佔用!", key); } //這裡開啟一個新的執行緒去訪問Redis,先查,在修改上面的使用者資料,這裡如果Redis幫我們判斷的話,那我們是讀不出資料的,而且不能修改該資料的 //因為上面的執行緒已經對當前使用者資料加鎖了 var result=await RedisClient.StringGetAsync(key); Console.WriteLine("成功查到了資料,值為:{0}", result); var newKv = new KeyValuePair<RedisKey, RedisValue>(key, 2); if (await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { newKv })) { var newResult = await RedisClient.StringGetAsync(new RedisKey[] { key }); Console.WriteLine("資料修改成功,修改後的資料為:{0}",newResult[0]); } } }

look,Redis並沒有幫助我們做了這個事情,他還是讓第二個執行緒去修改了使用者的資料.但是,開啟Redis桌面管理工具,如下資訊:

多了一條資料,裡面記錄了鎖的相關資訊,現在我終於明白了,這個事情還是得自己來,他不會幫你做,他只是個快取,所以修改程式碼如下:

    class Program
    {
        static Program()
        {
            //鏈式配置Redis
            AppConfiguration.Current.ConfigureRedis<RedisConfig>();
        }

        static void Main(string[] args)
        {
            StringSetGetAsync();
            Console.ReadKey();
        }

        static async void StringSetGetAsync()
        {
            //需要鎖的Redis資料的鍵
            var key = "使用者資訊Id";

            //先非同步寫入一個
            var kv=new KeyValuePair<RedisKey, RedisValue>(key, 1);
            await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { kv });
             
            //對使用者的資訊進行加鎖操作
            var time = 100000;
            //請求Id,這裡最好能區分客戶端的呼叫標識,方便異常時定位錯誤
            var requestId = "鎖的鍵";
            var lockResult = await RedisClient.LockTakeAsync(requestId, key, TimeSpan.FromMilliseconds(time));
            if (lockResult)
            {
                Console.WriteLine("對使用者資訊加鎖成功,請求的鎖Id為:{0},鎖的時間週期為:{1}毫秒", key, TimeSpan.FromMilliseconds(time));
            }
            else
            {
                Console.WriteLine("加鎖失敗,key為{0}的資料結構已被其它請求佔用!", key);
            }

            //修改前判斷當前使用者資料有沒有被加鎖
            var userInfoLockObj = (int)(await RedisClient.LockQueryAsync(key));
            if (userInfoLockObj >= 1)
            {
                Console.WriteLine("當前使用者鍵為:{0}的資料被別的執行緒(或者程序佔用了),無法進行操作,請等待鎖釋放!");
            }
            else
            {
                var result = await RedisClient.StringGetAsync(key);
                Console.WriteLine("成功查到了資料,值為:{0}", result);
                var newKv = new KeyValuePair<RedisKey, RedisValue>(key, 2);
                if (await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { newKv }))
                {
                    var newResult = await RedisClient.StringGetAsync(new RedisKey[] { key });
                    Console.WriteLine("資料修改成功,修改後的資料為:{0}", newResult[0]);
                }
            }
            
        }
    }

操作前,自己去查當前使用者資訊有沒有被加鎖,Redis會把鎖的資料寫入快取,並在過期時間到了時,自動回收對應鎖的記憶體,這樣當下個執行緒進來時,查不到鎖,那麼就可以操作這個資料了.這裡我選擇輪詢判斷,當然也可以使用佇列,將請求插入到佇列中,開啟一個執行緒去消費這個佇列,輪詢程式碼如下:

    class Program
    {
        static Program()
        {
            //鏈式配置Redis
            AppConfiguration.Current.ConfigureRedis<RedisConfig>();
        }

        static void Main(string[] args)
        {
            StringSetGetAsync();
            Console.ReadKey();
        }

        static async void StringSetGetAsync()
        {
            //需要鎖的Redis資料的鍵
            var key = "使用者資訊Id";

            //先非同步寫入一個
            var kv = new KeyValuePair<RedisKey, RedisValue>(key, 1);
            await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { kv });

            //對使用者的資訊進行加鎖操作
            var time = 10000;
            //請求Id,這裡最好能區分客戶端的呼叫標識,方便異常時定位錯誤
            var requestId = "鎖的鍵";
            var lockResult = await RedisClient.LockTakeAsync(requestId, key, TimeSpan.FromMilliseconds(time));
            if (lockResult)
            {
                Console.WriteLine("對使用者資訊加鎖成功,請求的鎖Id為:{0},鎖的時間週期為:{1}毫秒", key, TimeSpan.FromMilliseconds(time));
            }
            else
            {
                Console.WriteLine("加鎖失敗,key為{0}的資料結構已被其它請求佔用!", key);
            }

            //輪詢判斷鎖是否被釋放,釋放就修改資料
            while (true)
            {
                var userInfoLockObj =await RedisClient.LockQueryAsync(requestId);
                if (userInfoLockObj== key)
                {
                    //如果加鎖,休息一秒後重試
                    Thread.Sleep(1000);
                    Console.WriteLine("當前使用者被加鎖了,不能修改資料!");
                }
                else
                {
                    //鎖被釋放了,這個時候可以操作資料了
                    var result = await RedisClient.StringGetAsync(key);
                    Console.WriteLine("成功查到了資料,值為:{0}", result);
                    var newKv = new KeyValuePair<RedisKey, RedisValue>(key, 2);
                    if (await RedisClient.StringSetAsync(new KeyValuePair<RedisKey, RedisValue>[] { newKv }))
                    {
                        var newResult = await RedisClient.StringGetAsync(new RedisKey[] { key });
                        Console.WriteLine("資料修改成功,修改後的資料為:{0}", newResult[0]);
                    }
                    break;
                }
              
            }
        }
    }

10秒後,操作資料成功!當然佇列更加友好.不阻塞執行緒!這個例子舉得也不是很好,大多數情況下,鎖不會被一個執行緒佔用這麼久,一般用完就被釋放,1秒已經很長了.

注:這個過程不會存在死鎖的問題(除非Redis內部的設定過期的程序掛了),因為現在這個版本的Redis支援setnc和expire一起執行,屬於原子指令,即在設定鎖的同時設定過期時間.這個過程是同步的,據我所知老版本的Redis可能這兩個指令時分開的,可能會存在"長生不老"的鎖.

 

三、分散式鎖超時問題

如果你理解上面的內容,就會發現分散式鎖,並不能解決超時問題,感覺這一點和C#自帶的Timer類的問題很像,執行緒不會等待你執行完畢,在開啟第二輪的輪詢任務,執行緒不會等你.Timer中我提供瞭解決方案,Redis也存在相同的問題,但是兩者的解決方案不一樣,Timer是通過回撥的方式,當第一輪迴圈任務做完,在重啟Timer,執行第二輪任務.

而Redis則需要通過守護執行緒的方式去做,程式碼如下: