1. 程式人生 > >.NetCore使用Redis,StackExchange.Redis佇列,釋出與訂閱,分散式鎖的簡單使用

.NetCore使用Redis,StackExchange.Redis佇列,釋出與訂閱,分散式鎖的簡單使用

環境:之前一直是使用serverStack.Redis的客服端,
今天來使用一下StackExchange.Redis(個人感覺更加的人性化一些,也是免費的,效能也不會差太多),
版本為StackExchange.Redis V2.1.58 ,Core3.1

簡單的說明(專業的術語參考資料網路和官網):官網地址:https://www.redis.net.cn/

 

Redis是一個開源的 ,由C語言編寫、支援網路、可基於記憶體亦可持久化的日誌型、Key-Value資料庫,並提供多種語言的API。

Redis 是一個高效能的key-value資料庫。Redis的出現,很大程度補償了memcached這類key/value儲存的不足,

提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客戶端,使用很方便。

優點:

1 Redis讀寫效能優異,從記憶體當中進行IO讀寫速度快,支援超過100K+每秒的讀寫頻率。

2 Redis支援Strings,

Lists, Hashes, Sets,Ordered Sets等資料型別操作。

3 Redis支援資料持久化,支援AOF和RDB兩種持久化方式

4 Redis支援主從複製,主機會自動將資料同步到從機,可以進行讀寫分離。

5 Redis的所有操作都是原子性的,同時Redis還支援對幾個操作全並後的原子性執行。

6 Redis是單執行緒多CPU,不需要考慮加鎖釋放鎖,也就沒有死鎖的問題,效率高。

 

 

1:redis佇列值入隊出隊,截圖效果:

優化之前將近50秒了,這實在太慢了

 

 

優化後的效果:為5.55s的樣子

 

 

2:redis釋出與訂閱截圖效果:(一個釋出者,四個訂閱者)

 3:redis秒殺,截圖如下:單個程序秒殺ok

開多個程序時,會有超賣的現象:

 4:redis值分散式鎖,截圖:

但是這樣會比較耗資源

分散式鎖ok庫存為零就不在請求直接拋異常即可

 上面通過測試的截圖,簡單的介紹了,Redis的佇列(入隊和出隊),Redis釋出與訂閱,Redis分散式鎖的使用,現在直接上程式碼 :

出隊入隊的WebApi Core3.1

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Threading.Tasks;
 5 
 6 namespace WebApp.Controllers
 7 {
 8     using Microsoft.AspNetCore.Mvc;
 9     using RedisPublishAndSubHelper;
10     [Route("api/[Controller]")]
11     [ApiController]
12     public class RedisTestController
13     {
14         [HttpGet("EnqueueMsg")]
15         public async Task<ApiResultObject> EnqueueMsgAsync(string rediskey, string redisValue)
16         {
17             ApiResultObject obj = new ApiResultObject();
18             try
19             {
20                 long enqueueLong = default;
21                 for (int i = 0; i < 1000; i++)
22                 {
23                     enqueueLong = await MyRedisSubPublishHelper.EnqueueListLeftPushAsync(rediskey, redisValue + i);
24                 }
25                 obj.Code = ResultCode.Success;
26                 obj.Data = "入隊的資料長度:" + enqueueLong;
27                 obj.Msg = "入隊成功!";
28             }
29             catch (Exception ex)
30             {
31 
32                 obj.Msg = $"入隊異常,原因:{ex.Message}";
33             }
34             return obj;
35         }
36         [HttpGet("DequeueMsg")]
37         public async Task<ApiResultObject> DequeueMsgAsync(string rediskey)
38         {
39             ApiResultObject obj = new ApiResultObject();
40             try
41             {
42                 string dequeueMsg = await MyRedisSubPublishHelper.DequeueListPopRightAsync(rediskey);
43                 obj.Code = ResultCode.Success;
44                 obj.Data = $"出隊的資料是:{dequeueMsg}";
45                 obj.Msg = "入隊成功!";
46             }
47             catch (Exception ex)
48             {
49                 obj.Msg = $"入隊異常,原因:{ex.Message}";
50             }
51             return obj;
52         }
53     }
54 }
View Code

出隊入隊的後端code WebApi:

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Threading.Tasks;
 5 
 6 namespace WebApp.Controllers
 7 {
 8     using Microsoft.AspNetCore.Mvc;
 9     using RedisPublishAndSubHelper;
10     [Route("api/[Controller]")]
11     [ApiController]
12     public class RedisTestController
13     {
14         [HttpGet("EnqueueMsg")]
15         public async Task<ApiResultObject> EnqueueMsgAsync(string rediskey, string redisValue)
16         {
17             ApiResultObject obj = new ApiResultObject();
18             try
19             {
20                 long enqueueLong = default;
21                 for (int i = 0; i < 1000; i++)
22                 {
23                     enqueueLong = await MyRedisSubPublishHelper.EnqueueListLeftPushAsync(rediskey, redisValue + i);
24                 }
25                 obj.Code = ResultCode.Success;
26                 obj.Data = "入隊的資料長度:" + enqueueLong;
27                 obj.Msg = "入隊成功!";
28             }
29             catch (Exception ex)
30             {
31 
32                 obj.Msg = $"入隊異常,原因:{ex.Message}";
33             }
34             return obj;
35         }
36         [HttpGet("DequeueMsg")]
37         public async Task<ApiResultObject> DequeueMsgAsync(string rediskey)
38         {
39             ApiResultObject obj = new ApiResultObject();
40             try
41             {
42                 string dequeueMsg = await MyRedisSubPublishHelper.DequeueListPopRightAsync(rediskey);
43                 obj.Code = ResultCode.Success;
44                 obj.Data = $"出隊的資料是:{dequeueMsg}";
45                 obj.Msg = "入隊成功!";
46             }
47             catch (Exception ex)
48             {
49                 obj.Msg = $"入隊異常,原因:{ex.Message}";
50             }
51             return obj;
52         }
53     }
54 }
View Code

入隊以及秒殺分散式鎖的客服端的Code:

 

 1 using System;
 2 using System.Threading;
 3 using System.Threading.Tasks;
 4 
 5 namespace RedisPublishService
 6 {
 7     using RedisPublishAndSubHelper;
 8     class Program
 9     {
10         static void Main(string[] args)
11         {
12             #region 入隊的code
13             {
14                 int Index = 100000;
15                 while (Index > 0)
16                 {
17                     //string msg = Console.ReadLine();
18                     new MyRedisSubPublishHelper().PublishMessage("nihaofengge", $"你好風哥:Guid值是:{DateTime.Now}{Guid.NewGuid().ToString()}");
19                     Console.WriteLine("釋出成功!");
20                     Index -= 1;
21                 }
22                 Console.ReadKey();
23             }
24             #endregion
25 
26             #region 秒殺的code
27             {
28                 try
29                 {
30                     Console.WriteLine("秒殺開始。。。。。");
31                     for (int i = 0; i < 200; i++)
32                     {
33                         Task.Run(() =>
34                         {
35                             MyRedisSubPublishHelper.LockByRedis("mstest");
36                             string productCount = MyRedisHelper.StringGet("productcount");
37                             int pcount = int.Parse(productCount);
38                             if (pcount > 0)
39                             {
40                                 long dlong = MyRedisHelper.StringDec("productcount");
41                                 Console.WriteLine($"秒殺成功,商品庫存:{dlong}");
42                                 pcount -= 1;
43                                 System.Threading.Thread.Sleep(30);
44                             }
45                             else
46                             {
47                                 Console.WriteLine($"秒殺失敗,商品庫存為零了!");
48                                 throw new Exception("產品秒殺數量為零!");//載入這裡會比較保險
49                         }
50                             MyRedisSubPublishHelper.UnLockByRedis("mstest");
51                         }).Wait();
52                     }
53                 }
54                 catch (Exception ex)
55                 {
56                     Console.WriteLine($"產品已經秒殺完畢,原因:{ex.Message}");
57                 }
58                 Console.ReadKey();
59             }
60             #endregion
61         }
62     }
63 }
View Code

 

完整的code RedisHelper幫助類(測試並使用了部分方法的封裝),

  1 using System;
  2 using System.Collections.Generic;
  3 using System.Linq;
  4 using System.Threading.Tasks;
  5 
  6 namespace RedisPublishAndSubHelper
  7 {
  8     using StackExchange.Redis;
  9     using StackExchange;
 10     using System.Threading;
 11 
 12     public class MyRedisHelper
 13     {
 14         private static readonly string connectionRedisStr = string.Empty;// "47.107.87.32:6379,connectTimeout=1000,connectRetry=3,syncTimeout=10000";
 15         static MyRedisHelper()
 16         {
 17             //在這裡來初始化一些配置資訊
 18             connectionRedisStr = "12.23.45.12:6379,connectTimeout=1000,connectRetry=3,syncTimeout=10000";
 19         }
 20 
 21         #region Redis string簡單的常見同步方法操作
 22         public static bool StringSet(string key, string stringValue, double senconds = 60)
 23         {
 24             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
 25             {
 26                 IDatabase db = conn.GetDatabase();
 27                 return db.StringSet(key, stringValue, TimeSpan.FromSeconds(senconds));
 28             }
 29         }
 30         public static string StringGet(string key)
 31         {
 32             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
 33             {
 34                 IDatabase db = conn.GetDatabase();
 35                 return db.StringGet(key);
 36             }
 37         }
 38 
 39         public static long StringInc(string key)
 40         {
 41             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
 42             {
 43                 IDatabase db = conn.GetDatabase();
 44                 return db.StringIncrement(key);
 45             }
 46         }
 47 
 48         public static long StringDec(string key)
 49         {
 50             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
 51             {
 52                 IDatabase db = conn.GetDatabase();
 53                 return db.StringDecrement(key);
 54             }
 55         }
 56         public static bool KeyExists(string key)
 57         {
 58             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
 59             {
 60                 IDatabase db = conn.GetDatabase();
 61                 return db.KeyExists(key);
 62             }
 63         }
 64         #endregion
 65 
 66         #region List Hash, Set,Zset 大同小異的使用,比較簡單,後續有時間再補上
 67 
 68         #endregion
 69 
 70         #region 入隊出隊
 71 
 72         #region 入隊
 73         /// <summary>
 74         /// 入隊right
 75         /// </summary>
 76         /// <param name="queueName"></param>
 77         /// <param name="redisValue"></param>
 78         /// <returns></returns>
 79         public static long EnqueueListRightPush(RedisKey queueName, RedisValue redisValue)
 80         {
 81             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
 82             {
 83                 return conn.GetDatabase().ListRightPush(queueName, redisValue);
 84             }
 85         }
 86         /// <summary>
 87         /// 入隊left
 88         /// </summary>
 89         /// <param name="queueName"></param>
 90         /// <param name="redisvalue"></param>
 91         /// <returns></returns>
 92         public static long EnqueueListLeftPush(RedisKey queueName, RedisValue redisvalue)
 93         {
 94             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
 95             {
 96                 return conn.GetDatabase().ListLeftPush(queueName, redisvalue);
 97             }
 98         }
 99         /// <summary>
100         /// 入隊left非同步
101         /// </summary>
102         /// <param name="queueName"></param>
103         /// <param name="redisvalue"></param>
104         /// <returns></returns>
105         public static async Task<long> EnqueueListLeftPushAsync(RedisKey queueName, RedisValue redisvalue)
106         {
107             using (var conn = await ConnectionMultiplexer.ConnectAsync(connectionRedisStr))
108             {
109                 return await conn.GetDatabase().ListLeftPushAsync(queueName, redisvalue);
110             }
111         }
112         /// <summary>
113         /// 獲取佇列的長度
114         /// </summary>
115         /// <param name="queueName"></param>
116         /// <returns></returns>
117         public static long EnqueueListLength(RedisKey queueName)
118         {
119             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
120             {
121                 return conn.GetDatabase().ListLength(queueName);
122             }
123         }
124 
125         #endregion
126 
127         #region 出隊
128         public static string DequeueListPopLeft(RedisKey queueName)
129         {
130             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
131             {
132                 IDatabase database = conn.GetDatabase();
133                 int count = database.ListRange(queueName).Length;
134                 if (count <= 0)
135                 {
136                     throw new Exception($"佇列{queueName}資料為零");
137                 }
138                 string redisValue = database.ListLeftPop(queueName);
139                 if (!string.IsNullOrEmpty(redisValue))
140                     return redisValue;
141                 else
142                     return string.Empty;
143             }
144         }
145         public static string DequeueListPopRight(RedisKey queueName)
146         {
147             using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr))
148             {
149                 IDatabase database = conn.GetDatabase();
150                 int count = database.ListRange(queueName).Length;
151                 if (count <= 0)
152                 {
153                     throw new Exception($"佇列{queueName}資料為零");
154                 }
155                 string redisValue = conn.GetDatabase().ListRightPop(queueName);
156                 if (!string.IsNullOrEmpty(redisValue))
157                     return redisValue;
158                 else
159                     return string.Empty;
160             }
161         }
162         public static async Task<string> DequeueListPopRightAsync(RedisKey queueName)
163         {
164             using (var conn = await ConnectionMultiplexer.ConnectAsync(connectionRedisStr))
165             {
166                 IDatabase database = conn.GetDatabase();
167                 int count = (await database.ListRangeAsync(queueName)).Length;
168                 if (count <= 0)
169                 {
170                     throw new Exception($"佇列{queueName}資料為零");
171                 }
172                 string redisValue = await conn.GetDatabase().ListRightPopAsync(queueName);
173                 if (!string.IsNullOrEmpty(redisValue))
174                     return redisValue;
175                 else
176                     return string.Empty;
177             }
178         }
179         #endregion
180 
181         #endregion
182 
183         #region 分散式鎖
184         public static void LockByRedis(string key, int expireTimeSeconds = 10)
185         {
186             try
187             {
188                 IDatabase database1 = ConnectionMultiplexer.Connect(connectionRedisStr).GetDatabase();
189                 while (true)
190                 {
191                     expireTimeSeconds = expireTimeSeconds > 20 ? 10 : expireTimeSeconds;
192                     bool lockflag = database1.LockTake(key, Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(expireTimeSeconds));
193                     if (lockflag)
194                     {
195                         break;
196                     }
197                 }
198             }
199             catch (Exception ex)
200             {
201                 throw new Exception($"Redis加鎖異常:原因{ex.Message}");
202             }
203         }
204 
205         public static bool UnLockByRedis(string key)
206         {
207             ConnectionMultiplexer conn = ConnectionMultiplexer.Connect(connectionRedisStr);
208             try
209             {
210                 IDatabase database1 = conn.GetDatabase();
211                 return database1.LockRelease(key, Thread.CurrentThread.ManagedThreadId);
212             }
213             catch (Exception ex)
214             {
215                 throw new Exception($"Redis加鎖異常:原因{ex.Message}");
216             }
217             finally
218             {
219                 if (conn != null)
220                 {
221                     conn.Close();
222                     conn.Dispose();
223                 }
224             }
225         }
226         #endregion
227 
228     }
229 }
View Code
  1 using System;
  2 using System.Collections.Generic;
  3 using System.Text;
  4 
  5 namespace RedisPublishAndSubHelper
  6 {
  7     using StackExchange.Redis;
  8     using System.Net.Http;
  9     using System.Threading;
 10     using System.Threading.Channels;
 11     using System.Threading.Tasks;
 12 
 13     public class MyRedisSubPublishHelper
 14     {
 15         private static readonly string redisConnectionStr = "12.32.12.54:6379,connectTimeout=10000,connectRetry=3,syncTimeout=10000";
 16         private static readonly ConnectionMultiplexer connectionMultiplexer = null;
 17         static MyRedisSubPublishHelper()
 18         {
 19             connectionMultiplexer = ConnectionMultiplexer.Connect(redisConnectionStr);
 20         }
 21 
 22 
 23         #region 釋出訂閱
 24         public void SubScriper(string topticName, Action<RedisChannel, RedisValue> handler = null)
 25         {
 26             ISubscriber subscriber = connectionMultiplexer.GetSubscriber();
 27             ChannelMessageQueue channelMessageQueue = subscriber.Subscribe(topticName);
 28             channelMessageQueue.OnMessage(channelMessage =>
 29             {
 30                 if (handler != null)
 31                 {
 32                     string redisChannel = channelMessage.Channel;
 33                     string msg = channelMessage.Message;
 34                     handler.Invoke(redisChannel, msg);
 35                 }
 36                 else
 37                 {
 38                     string msg = channelMessage.Message;
 39                     Console.WriteLine($"訂閱到訊息: { msg},Channel={channelMessage.Channel}");
 40                 }
 41             });
 42         }
 43         public void PublishMessage(string topticName, string message)
 44         {
 45             ISubscriber subscriber = connectionMultiplexer.GetSubscriber();
 46             long publishLong = subscriber.Publish(topticName, message);
 47             Console.WriteLine($"釋出訊息成功:{publishLong}");
 48         }
 49         #endregion
 50 
 51         #region 入隊出隊
 52         public static async Task<long> EnqueueListLeftPushAsync(RedisKey queueName, RedisValue redisvalue)
 53         {
 54             return await connectionMultiplexer.GetDatabase().ListLeftPushAsync(queueName, redisvalue);
 55         }
 56 
 57         public static async Task<string> DequeueListPopRightAsync(RedisKey queueName)
 58         {
 59             IDatabase database = connectionMultiplexer.GetDatabase();
 60             int count = (await database.ListRangeAsync(queueName)).Length;
 61             if (count <= 0)
 62             {
 63                 throw new Exception($"佇列{queueName}資料為零");
 64             }
 65             string redisValue = await database.ListRightPopAsync(queueName);
 66             if (!string.IsNullOrEmpty(redisValue))
 67                 return redisValue;
 68             else
 69                 return string.Empty;
 70         }
 71         #endregion
 72 
 73         #region 分散式鎖
 74         public static void LockByRedis(string key, int expireTimeSeconds = 10)
 75         {
 76             try
 77             {
 78                 IDatabase database = connectionMultiplexer.GetDatabase();
 79                 while (true)
 80                 {
 81                     expireTimeSeconds = expireTimeSeconds > 20 ? 10 : expireTimeSeconds;
 82                     bool lockflag = database.LockTake(key, Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(expireTimeSeconds));
 83                     if (lockflag)
 84                     {
 85                         break;
 86                     }
 87                 }
 88             }
 89             catch (Exception ex)
 90             {
 91                 throw new Exception($"Redis加鎖異常:原因{ex.Message}");
 92             }
 93         }
 94 
 95         public static bool UnLockByRedis(string key)
 96         {
 97             try
 98             {
 99                 IDatabase database = connectionMultiplexer.GetDatabase();
100                 return database.LockRelease(key, Thread.CurrentThread.ManagedThreadId);
101             }
102             catch (Exception ex)
103             {
104                 throw new Exception($"Redis加鎖異常:原因{ex.Message}");
105             }
106         }
107         #endregion
108     }
109 }
View Code

&n