1. 程式人生 > >藉助Redis完成延時任務

藉助Redis完成延時任務

## 背景 相信我們或多或少的會遇到類似下面這樣的需求: 第三方給了一批資料給我們處理,我們處理好之後就通知他們處理結果。 大概就是下面這個圖說的。 ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9nm5nrzDiaYCZwq2aQSHM2pOcnkbsEabkBibwFGg28T0TGt1mIq2j84vA/0?wx_fmt=png) 本來在處理完資料之後,我們就會馬上把處理結果返回給對方,但是對方要求我們處理速度不能過快,要有一種人為處理的效果。 換句話就是說,就算是處理好了,也要晚一點再執行通知操作。 這就是一個典型的延時任務。 延時,那還不簡單,執行完之後,讓它`Sleep`一下就好了,這樣就達到目標了。 `Sleep`一下確定是最容易實現的一種方案,但是試想一下,資料的數量不斷的增加,這樣`Sleep`真的好嗎?答案是否定的。 延時佇列,是處理這個場景最為妥當的方案。 RabbitMQ,RocketMQ,Cmq等都可以直接或間接的達到相應的效果。 如果不具備佇列條件,又要怎麼處理呢?還可以藉助Redis來完成這項工作。 MQ不一定每個公司都會用,但Redis應該80%以上的都會用吧。 ## 處理方案 Redis這邊,可用的方案有兩種,下面分別來介紹一下。 ### #1 鍵的過期時間 在設定快取的時候,我們比較多情況下都會設定一個快取的過期時間,這個時間過期後,會重新去資料來源拿資料回來。 可以基於這個過期時間結合Redis的**keyspace notifications**共同完成。 keyspace notifications裡面包含了非常多的事件,這裡只關注`EXPIRE`,這個是和過期有關的。 只要訂閱了`__keyevent@0__:expired`這個主題,當有key過期的時候,就會收到對應的資訊。 >注:主題@後面的0,指的是db 0. 要想使用這個特性,必不可少的一步是修改Redis預設的配置,把`notify-keyspace-events`設定成`Ex`。 ```conf ############################# Event notification ############################## # Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/notifications # # ......... # # By default all notifications are disabled because most users don't need # this feature and the feature has some overhead. Note that if you don't # specify at least one of K or E, no events will be delivered. notify-keyspace-events "Ex" ``` 其中 E 指的是鍵事件通知,x 指的是過期事件。 根據這個特性,重新調整一下流程圖: ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9Qs3jcmaaOO4g76X7gMr7WVVZ1DUsZWaPxXiaQPXr0nsV4mNVNNwxicKQ/0?wx_fmt=png) 應該也比較好懂,下面通過簡單的程式碼來實現一下這種方案。 首先是處理完資料及往Redis寫資料。 ```cs public async Task DoTaskAsync() { // 資料處理 // ... // 後續操作要延時,把Id記錄下來 var taskId = new Random().Next(1, 10000); // 要延遲的時間 int sec = new Random().Next(1, 5); // 可以加個重試機制,預防單次執行失敗。 await RedisHelper.SetAsync($"task:{taskId}", "1", sec); } ``` 還需要回傳結果的後臺任務,這個任務就是去訂閱上面說的鍵過期事件,然後回傳結果。 這裡可以藉助`BackgroundService`來訂閱處理。 ```cs public class SubscribeTaskBgTask : BackgroundService { protected override Task ExecuteAsync(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); var keyPrefix = "task:"; RedisHelper.Subscribe( ("__keyevent@0__:expired", arg => { var msg = arg.Body; Console.WriteLine($"recive {msg}"); if (msg.StartsWith(keyPrefix)) { // 取到任務Id var val = msg.Substring(keyPrefix.Length); Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}"); // 回傳處理結果給第三方,這裡可以考慮這個併發鎖,避免多例項都處理了這個任務。 // .... } } )); return Task.CompletedTask; } } ``` 這裡有一個要注意的地方,要在key裡面包含任務的Id,因為訂閱處理的時候,只能拿到一個key,後續能做的操作也只是基於這個key。 上面的例子,是用了`task:任務Id`的形式,所以在訂閱處理的時候,只處理以`task:`開頭的那些key。 效果如下: ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9pZBI4p5k1cXNNZDOiaJdfSJQSFJxZKJHL1n0Dz3HYibaibCCNrjAAV5yA/0?wx_fmt=png) 這種方案,直觀上是非常簡單的,不過這種方案會遇到一個小問題。 當一個key過期後,並不一定會馬上收到通知,這個也是會有一定的延時的,取決於Redis的內部機制。 Redis Keyspace Notifications文件的最後一段也提到了這個問題。 ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9QLDLm6N9OaWUdfkIpKa8drEmBtKtL8xPIVPo3JQnxAf1nLJ1TOxzXQ/0?wx_fmt=png) 所以用這種方案的時候,要考慮一下,你的延時是不是要及時~~ ### #2 有序集合 有序集合是Redis中一種十分有用的資料結構,它的本質其實就是集合加了一個排序的功能,每個集合裡面的元素還會有一個分值的屬性。 它提供了一個可以獲取指定分值範圍內的元素,這個也就是我們的出發點。 在這個場景下,什麼東西可能作為這個分值呢?現在只有一個處理任務的Id還有一個延遲的時間,Id肯定不行,那麼也只能是延遲時間來作這個分值了。 延遲1秒,5秒,1分鐘,這個都是比較大粒度的時間,這裡要轉化一下,用時間戳來代替這些延遲的時間。 假設現在的時間戳是 `1584171520`, 要延遲5秒執行,那麼執行任務的時間就是 `1584171525`,在當前時間戳的基礎上加個5秒,就是最終要執行的了。 到時有序集合中存的元素就會是這樣的 ``` 任務Id-1 1584171525 任務Id-2 1584171528 任務Id-3 1584171530 ``` 接下來就是要怎麼取出這些任務的問題了! 把當前時間戳當成是取數的最大分值,0作為最小分值,這個時候取出的元素就是應該要執行回傳的任務了。 根據這個方案,重新調整一下流程圖: ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9Mfj0ibwNcj2JIgCwfEcBBOicqib9ibedfibxOh0b18DFPoBg3QnOGOsiayJg/0?wx_fmt=png) 交代清楚了思路,再來點程式碼,加深一下理解。 首先還是處理完資料後往Redis寫資料。 ```cs public async Task DoTaskAsync() { // 資料處理 // ... // 後續操作要延時,把Id記錄下來 var taskId = new Random().Next(1, 10000); var cacheKey = "task:delay"; int sec = new Random().Next(1, 5); // 要執行這個任務的時間戳 var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds(); await RedisHelper.ZAddAsync(cacheKey, (time, taskId)); Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}"); } ``` 後面就是輪訓有序集合裡面的元素了,這裡同樣是藉助`BackgroundService`來處理。 ```cs public class SubscribeTaskBgTask : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); var cacheKey = "task:delay"; while (true) { // 先取,後刪,不具備原子性,可考慮用lua指令碼來保證原子性。 var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0); if (vals != null && vals.Length > 0) { var val = vals[0]; var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals); if (rmCount > 0) { // 要把這個元素先刪除成功了,再執行任務,不然會重複 Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}"); // 回傳處理結果給第三方,這裡可以考慮這個併發鎖,避免多例項都處理了這個任務。 // .... } } else { // 沒有資料,休眠500ms,避免CPU空轉 await Task.Delay(500); } } } } ``` 效果如下: ![](https://mmbiz.qpic.cn/mmbiz_png/yrY77YblI8LcWL5ECHd9nmh78icUYTRY9Ke3fqWKcHzRFXglvR5ftqibDBDau9XIPm7ZgUeYJUXreJbatTO7gQmA/0?wx_fmt=png) ## 參考文章 [https://redis.io/topics/notifications](https://redis.io/topics/notifications) [https://zhuanlan.zhihu.com/p/87113913](https://zhuanlan.zhihu.com/p/87