1. 程式人生 > >Redis中的Stream資料型別作為訊息佇列的嘗試

Redis中的Stream資料型別作為訊息佇列的嘗試

Redis的List資料型別作為訊息佇列,已經比較合適了,但存在一些不足,比如只能獨立消費,訂閱釋出又無法支援資料的持久化,相對前兩者,Redis Stream作為訊息佇列的使用更為有優勢。   相信球迷小夥伴們對文字直播這個東西都不陌生,時常在想,這個功能是怎麼實現的? 具體說就是用什麼技術實現最為合適?如何面對數以百萬計的讀壓力?廣告訊息是如何插播進來的?最後的歷史訊息如何歸檔,如何持久化儲存? 文字直播其實就是解說員作為生產者,生產訊息(文字資訊),各種客戶端作為消費者,消費資訊(重新整理文字內容)。 典型的訊息佇列實現,可以用佇列或者類似佇列的功能實現,這裡只是簡單想象一下,結合redis中的stream資料型別,來學習stream作為訊息佇列的功能實現。     1,生成者:生產者佇列的建立,與訊息的增刪改 1.1 建立並寫入訊息   語法:xadd queue_name Id filed value(filed value)      1,每一組訊息需要一個唯一的Id,*號表示伺服器自動生成ID,後面順序跟著一組或者多組訊息(filed value)      2,訊息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示當前的訊息在毫米時間戳1527846880572時產生,並且是該毫秒內產生的第5條訊息。            訊息ID可以由伺服器自動生成,也可以由客戶端自己指定,但是形式必須是整數-整數,而且必須是後面加入的訊息的ID要大於前面的訊息ID。      3,訊息元素的的結構為key-value,必須成對出現,如果key或者value元素中有空格,必須用"abc  def"或者'abc  def'括起來
  1.2 生產者寫入訊息   語法:xadd queue_name *|Id filed value       1.3 xlen 當前stream的長度:xlen stream_name   xlen "NBA_Match_001" ,也就是上面寫入的10條訊息
     
1.4 限制某一個stream的最大長度,maxlen    依據先進先出的原則,自動刪除超出最長長度的訊息   xadd "NBA_Match_001" maxlen 50000 * "2019-07-13 08:26:39" "反擊哈騰,一條龍上籃得分"   
1.5 查詢訊息(查詢是生產者查詢自己生產的訊息,跟消費者的消費是兩碼事)

正向查詢
xrange "NBA_Match_001"              # 查詢所有訊息
xrange "NBA_Match_001" - +              # -表示最小值, +表示最大值
xrange "NBA_Match_001" 1562980142175-0 +    # 指定最小訊息ID的列表
xrange "NBA_Match_001"- 1562980142175-0      # 指定最大訊息ID的列表
反向查詢
xrevrange "NBA_Match_001"
xrevrange "NBA_Match_001" + -

xrevrange "NBA_Match_001" + 1562980142175-0
xrevrange "NBA_Match_001" 1562980142175-0 -

1.6 刪除訊息
  xdel stream_name id,刪除訊息並不是真正的物理刪除,佇列的長度不變,指示標記當前訊息被刪除

1.7 檢視stream屬性xinfo stream stream_name  1.8 del stream_name 刪除 stream :del NBA_Match_001 刪除本質上本Redis中的其他資料型別一致,stream本身就是一個key值,del key值就刪除了整個訊息的全部資訊。

 

    2 xread:獨立消費 類似於List,生產者往list中寫資料,消費者從list中讀資料,只能有一個消費者   2. 1,從頭部讀取訊息,從某個streams中讀取n條訊息,0-0只從頭開始,或者指定從streams的Id開始   xread count 1 streams "NBA_Match_001" 0-0   xread count 1 streams "NBA_Match_001" 1562980142175-0    2.2,從尾部讀取最新的一條訊息 xread count 1 streams "NBA_Match_001" $ 此時預設不返回任何訊息 xread  block 0 count 1 streams "NBA_Match_001" $ 以阻塞的方式讀取尾部最新的一條訊息,直到新的訊息的到來      3 多消費者xgroup :消費組,每個組中的消費者獨立消費stream中的訊息 典型的比如文字直播的安卓App客戶端,蘋果App客戶端,網頁客戶端等等。多個終端,都可以獨立地消費佇列裡面的

3.1 建立消費組

對訊息佇列"NBA_Match_001"建立了兩個消費組,一個是cg1,一個是cg2,比如網頁客戶端與App客戶端 

1,xgroup create "NBA_Match_001" cg1 0-0  #  表示從頭開始消費 建立消費組cg1,消費組必須繫結一個steam(NBA_Match_001),從頭(0-0 )開始消費"NBA_Match_001"中的訊息 2,xgroup create "NBA_Match_001" cg2 0-0  #  表示從頭開始消費 3,2 從消費組中建立消費者 xreadgroup指令可以進行消費組的組內消費
xreadgroup GROUP cg1 c1 count 1 streams "NBA_Match_001" >
>號表示從當前消費組的last_delivered_id後面開始讀 , 每當消費者讀取一條訊息,last_delivered_id變數就會前進  當一個組的消費則消費完全部訊息之後,就沒有新的訊息了  

每個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的訊息會被每個消費組都消費到。
同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關係,任意一個消費者讀取了訊息都會使遊標last_delivered_id往前移動。
每個消費者者有一個組內唯一名稱。

 

關於消費組,可能不太好理解,舉個例子就比較清楚
假設有2個消費組cg1,cg2,對於cg1,其組內共有3個消費者c1,、c2、c3。一個訊息佇列中共有5條訊息a,b,c,d,e,那麼一種可能的消費方式如下
a -> c1
b -> c2
c -> c3
d -> c1
e -> c2
也就是說3個消費者,對於訊息的消費是互斥的,消費的訊息是沒有交集的
而對於cg2,同樣可以消費a,b,c,d,e這5條訊息,不依賴於cg1消費組以及消費情況,同理,具體怎麼消費,取決於其組內的消費者數量
就好比體育直播的客戶端,正常情況下,網頁客戶端可以收到所有的直播訊息,手機App客戶端也可以收到所有的直播訊息一樣,不同消費組間對訊息的消費互不干擾。

 

 

4 多個生產者和多個消費者

  這種情況類似以上,不用的是增加了多個消費者,在上面的基礎上做了擴充套件。
  其實不難想象,文字直播插播的廣告訊息,可能是類似如下結構,是另外一個獨立的生產者,與文字直播員一樣生成寫入訊息到佇列,然後客戶端看到的就是夾雜了廣告的直播。

 

 

目前就個人認識而言,stream資料型別實現訊息佇列並不完美,最大的問題就是單點壓力問題:這裡是說單點壓力,而不是單點故障,stream型別資料,其實從邏輯上看,是一個key值(stream_name),跟著一系列value(訊息),這些訊息只能儲存在一個Redis例項中,如何緩解多個消費者對單個Key值中的訊息消費壓力?說來說去,不就是想說kafka的partition麼……

 

 

參考:

http://database.51cto.com/art/201812/588189.htm

https://www.zhihu.com/question/279540635