1. 程式人生 > >Redis的批量操作是什麼?怎麼實現的延時佇列?以及訂閱模式、LRU。

Redis的批量操作是什麼?怎麼實現的延時佇列?以及訂閱模式、LRU。

## 前言 這次的內容是我自己為了總結Redis知識而擴充的,上一篇其實已經總結了幾點知識了,但是Redis的強大,以及適用範圍之廣可不是單單一篇博文就能總結清的。所以這次準備繼續總結,因為第一個問題,Redis的批量操作,是我在面試過程中被真實問到的,當時沒答上來,也是因為確實沒了解過Redis的批量操作。 當時的問題,我還記得比較清晰:Redis執行批量操作的功能是什麼?使用場景就是搞促銷活動時,會做預快取,會往快取裡放大批資料,如果直接放的話那麼會很慢,怎麼能提高效率呢? ## Redis的批量操作-管道(pipeline) 首先Redis的管道(pipeline)並不是Redis服務端提供的功能,而是Redis客戶端為了減少網路互動而提供的一種功能。 正常的一次Redis網路互動如下: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20201231231918737.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_SmlNb2Vy,size_30,color_c8cae6,t_70) **pipeline**主要就是將多個請求合併,進行一次提交給Redis伺服器,Redis伺服器將所有請求處理完成之後,再一次性返回給客戶端。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20201231233247603.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_SmlNb2Vy,size_30,color_c8cae6,t_70) 下面我們分析一下**pipeline**的原理 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20210101093316355.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_SmlNb2Vy,size_60,color_c8cae6,t_70) **pipeline**的一個互動過程是這樣的: 1. 客戶端程序呼叫`write`命令將訊息寫入到作業系統核心為套接字分配的**傳送緩衝區send buffer**。 2. 客戶端作業系統通過網路路由,將**send buffer**中的資料傳送給伺服器作業系統為套接字分配的接**收緩衝區 receive buffer**。 3. 服務端程序呼叫`read`命令從**receive buffer**中取出資料進行處理,然後呼叫`write`命令將相應資訊寫入到服務端的**send buffer**中。 4. 服務端作業系統通過網路路由,將**send buffer**中的資料傳送給客戶端作業系統的**receive buffer**。 5. 客戶端程序呼叫read命令將資料從**receive buffer**中取出進行業務處理。 在使用**pipeline**時需要注意: > - **pipeline執行的操作,和mget,mset,hmget這樣的操作不同,pipeline的操作是不具備原子性的。** > - **還有在叢集模式下因為資料是被分散在不同的slot裡面的,因此在進行批量操作的時候,不能保證操作的資料都在同一臺伺服器的slot上,所以叢集模式下是禁止執行像mget、mset、pipeline等批量操作的,如果非要使用批量操作,需要自己維護key與slot的關係。** > - **pipeline也不能保證批量操作中有命令執行失敗了而中斷,也不能讓下一個指令依賴上一個指令,如果非要這樣的複雜邏輯,建議使用lua指令碼來完成操作。** ## Redis實現訊息佇列和延時佇列 ### 訊息佇列 Redis的實現訊息佇列可以用list來實現,通過lpush與rpop或者rpush與lpop結合來實現訊息佇列。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20210101184213391.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_SmlNb2Vy,size_30,color_c8cae6,t_70) 但是若是list為空後,無論是lpop還是rpop都會持續的獲取list中的資料,若list一直為空,持續的拉取資料,一是會增加客戶端的cpu利用率,二是也增高了Redis的QPS,解決方案是使用**blpop**或**brpop**來代替lpop或rpop。 其實blpop和brpop的作用是bloking pop,就是阻塞拉取資料,當訊息佇列中為空時就會停止拉取,有資料後立即恢復拉取。 但是當沒有資料的時候,**阻塞拉取**,就會一直阻塞在那裡,時間久了就成了空閒連線,那麼Redis伺服器一般會將時間閒置過久的連線直接斷掉,以減少連線資源。所以還要檢測**阻塞拉取**丟擲的異常然後進行重試。 >另外一點,就是Redis實現的訊息佇列,沒有ACK機制,所以想要實現訊息的可靠性,還要自己實現當訊息處理失敗後,能繼續拋回佇列。 ### 延時佇列 用Redis實現延時佇列,其實就是使用zset來實現,將訊息序列化成一個字串(可以是json格式),作為為`value`,訊息的到期處理時間做為`score`,然後用多執行緒去輪詢zset來獲取到期訊息進行處理。 多執行緒輪詢處理,保證了可用性,但是要做冪等或鎖處理,保證不要重複處理訊息。 主要的實現程式碼如下。 ```java /** * 放入延時佇列 * @param queueMsg */ private void delay(QueueMsg queueMsg){ String msg = JSON.toJSONString(queueMsg); jedis.zadd(queueKey,System.currentTimeMillis()+5000,msg); } /** * 處理佇列中從訊息 */ private void lpop(){ while (!Thread.interrupted()){ // 從佇列中取出,權重為0到當前時間的資料,並且數量只取一個 Set strings = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); // 如果訊息為空,就歇會兒再取。 if(strings.isEmpty()){ try { //休息一會兒 Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); break; } continue; } String next = strings.iterator().next(); // 如果搶到了訊息 if(jedis.zrem(queueKey,next)>0){ // 反序列化後獲取到訊息 QueueMsg queueMsg = JSON.parseObject(next, QueueMsg.class); // 進行訊息處理 handleMsg(queueMsg); } } } ``` ## 訂閱模式 Redis的主題訂閱模式,其實並不想過多總結,因為由於它本身的一些缺點,導致它的應用場景比較窄。 前面總結的用Redis的list實現的訊息佇列,雖然可以使用,但是並不支援訊息多播的場景,即一個生產者,將訊息放入到多個佇列中,然後多個消費者進行消費。 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20210102103013272.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_SmlNb2Vy,size_30,color_c8cae6,t_70) 這種訊息多播的場景常用來做分散式系統中的解耦。用哦`publish`進行生產者傳送訊息,消費者使用`subscribe`進行獲取訊息。 例如:我向jimoerChannel傳送了一條訊息 `b-tree` ```powershell 127.0.0.1:6379> publish jimoerChannel b-tree (integer) 1 ``` 訂閱這個渠道的消費者立馬收到了一條b-tree的訊息。 ```powershell 127.0.0.1:6379> subscribe jimoerChannel Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "jimoerChannel" 3) (integer) 1 1) "message" 2) "jimoerChannel" 3) "b-tree" ``` 我前面也說到了,Redis的pub/sub訂閱模式,其實最大的缺點就是,訊息不能持久化,這樣就導致,若是消費者掛了或是沒有消費者,那麼訊息就會被直接丟棄。因為這個原因,所以導致他的使用場景比較少。 ## Redis的過期策略 Redis的過期策略是適用於所有資料結構的。資料一到過期時間就自動刪除,Redis會將設定了過期時間的key 放置在一個字典表裡。 ### 定期刪除 Redis會定期遍歷字典表裡面數據來刪除過期的Key。 **Redis預設的定期刪除策略是每秒進行10次過期掃描,即每100ms掃描一次。並不是掃描全部設定了過期時間的key,而是隨機掃描20個key,刪除掉已經過期的key,如果過期的比率超過25%,那麼就繼續進行掃描。** ### 惰性刪除 因為定期刪除是隨機抽取一些key來進行過期刪除,所以如果key並沒有被定期掃描到,那麼過期的key就不會被刪除。所以Redis還提供了惰性刪除的策略,**就是當去查詢某些key的時候,若是key已經過期了,那麼就會刪除key,然後返回null。** 另外一點當在叢集條件下,主從同步情況中,主節點中的key過期後,會在aof中生成一條刪除指令,然後同步到從節點,這樣的從節點在接收到aof的刪除指令後,刪除掉從節點的key,因為主從同步的時候是非同步的所以,短暫的會出現主節點已經沒有資料了,但是從節點還存在。 但是若是定期刪除也沒有掃描到key,而且好長時間也沒去去使用key,那麼這部分過期的key就會一直佔用的記憶體。 所以Redis又提供了記憶體淘汰機制。 ### 記憶體淘汰機制 當Redis的記憶體出現不足時,就會持續的和磁碟進行互動,這樣就會導致Redis卡頓,效率降低等情況。這在線上是不允許發生的,所以Redis提供了配置引數 `maxmemory` 來限制記憶體超出期望大小。 當記憶體使用情況超過maxmemory的值時,Redis提供了以下幾種策略,來讓使用者通過配置決定該如何騰出記憶體空間來繼續提供服務。 - **`noeviction`** 不會繼續提供寫請求(del請求可以),讀請求可以,寫請求會報錯,這樣保證的資料不會丟失,但是業務不可用,這是預設的策略。 - **`volatile-lru`** 會將設定了過期時間的key中,淘汰掉最近最少使用的key。沒有設定過期時間的key不會被淘汰,保證了需要持久化的資料不丟。 - **`volatile-ttl`** 嘗試將設定了過期時間的key中,剩餘生命週期越短,越容易被淘汰。 - **`volatile-random`** 嘗試將從設定了過期時間的key中,隨機選擇一些key進行淘汰。 - **`allkeys-lru`** 從所有key中,淘汰掉最近最少使用的key。 - **`allkeys-random`** 從所有key中,隨機淘汰一部分key。 那麼具體設定成哪種淘汰策略呢? **這就是要看在使用Redis時的具體場景了,如果只是用Redis做快取的話,那麼可以配置allkeys-lru或allkey-random,客戶端在寫快取的時候並不用攜帶著過期時間。若是還想要用持久化的功能,那麼就應該使用volatile-開頭的策略,這樣可以保證每月設定過期時間的key不會被淘汰。** 記憶體淘汰策略的配置如下: ```powershell # 最大使用記憶體 maxmemory 5m # 記憶體淘汰策略 The default is:noeviction maxmemory-policy allkeys-lru ``` ### LRU演算法 LRU演算法的實現,其實可以靠一個連結串列。連結串列按照使用情況來進行排序,當空間不足時,會剔除掉尾部的資料。當某個元素被訪問時它會被移動到連結串列頭。 在真實的面試中,若是讓寫出LRU演算法,我認為可以使用Java中的LikedHashMap來實現,因為LikedHashMap已經實現了基本的LRU功能,我只需要封裝一下就改造成了自己的了。 ```java /** * @author Jimoer * @description */ public class MyLRUCache { // lru容量 private int lruCapacity; // 資料容器(記憶體) private Map dataMap; public MyLRUCache(int capacity){ this.lruCapacity = capacity; // 設定LinkedHashMap的初始容量為LRU的最大容量, // 擴容因子為預設的0.75,第三個引數是否將資料按照訪問順序排序。 dataMap = new LinkedHashMap(capacity, 0.75f, true){ @Override protected boolean removeEldestEntry(Map.Entry eldest) { // 當資料量大於lruCapacity時,移除掉最老使用的資料。 return super.size()>lruCapacity; } }; } public V get(K k){ return dataMap.get(k); } public void put(K key, V value){ dataMap.put(key,value); } public int getLruCapacity() { return lruCapacity; } public Map getDataMap() { return dataMap; } } ``` 測試程式碼: ```java @Test public void lruTest(){ // 記憶體容量為3,即儲存3條資料後,再放入資料,就會將最老使用的資料刪除 MyLRUCache myLRUCache = new MyLRUCache(3); myLRUCache.put("1k","張三"); myLRUCache.put("2k","李四"); myLRUCache.put("3k","王五"); // 容量已滿 System.out.println("myLRUCache:"+JSON.toJSONString(myLRUCache.getDataMap())); // 繼續放入資料,該刪除第一條資料為第四條資料騰出空間了 myLRUCache.put("4k","趙六"); // 打印出結果 System.out.println("myLRUCache:"+JSON.toJSONString(myLRUCache.getDataMap())); } ``` 執行結果: ```java myLRUCache:{"1k":"張三","2k":"李四","3k":"王五"} myLRUCache:{"2k":"李四","3k":"王五","4k":"趙六"} ``` ## 總結 好了,Redis的相關知識,就總結到這裡了,算上前面兩篇博文([Redis基礎資料結構總結](https://blog.csdn.net/qq_35165000/article/details/109524499)、[你說一下Redis為什麼快吧,怎麼實現高可用,還有持久化怎麼做的](https://www.cnblogs.com/jimoer/p/14204650.html)),這是Redis的第三篇了,這一篇博文也是新年的第一篇,元旦假期在家花了兩天時間,自己學習自己總結。元旦假期結束後,我要繼續面試了,後面我會繼續將我面試中遇到的各種問題,總結出來,一是增加自己的知識面,二也將知識進行的傳播。 畢竟獨樂樂不眾樂