Redis 內建了一個 Kafka:Stream
流(Stream)是Redis從5.0.0版本新加入的一個數據結構,是一個類似於Kafka的訊息系統。該結構相關的大部分命令使用字母X
開頭 如XADD
,XLEN
,XRANGE
等。
在開始詳細敘述之前,先說明一下:
本文內容主要是結合官網文章Introduction to Redis Streams 和個人理解整理而成。另外想吐槽下自己,遊戲玩多了也不太好,Stream
總是不自覺的寫成了Steam
。
向Stream中新增資料
命令格式如
如
返回的1541558444516-0
是訊息id。 可以看到上面的命令ID引數傳的是*
, 代表由系統生成的ID,當然,你可以顯式的指定訊息ID(基本不太常用),對於這個ID有幾點需要注意:
-
ID必須是由
-
連線的兩部分,前一部分預設情況下是當前Server當前的毫秒時間戳,後一部分是一個無符號64位長整型序列號; -
同一個KEY下,後加入的ID一定要比已加入的ID大。
獲取Stream長度
使用XLEN
, 如
從Stream中獲取資料
有三種方式從Stream中獲取資料
按照範圍查詢
包括XRANGE
和XREVRANGE
兩個命令 ,分別是正序和反序, 以正序XRANGE
為例:
從Stream這個key中返回ID範圍是start
到end
的[前count個]
資料。 如
-
,+
分別表示最小和最大ID。
監聽(XREAD)
XREAD
命令
如
執行完成後再執行:
發現Stream的長度沒有變化,也就是說,XREAD
不會刪除Stream裡的資料。
上面的這個例子是一個非阻塞的方式監聽。當使用BLOCK
引數,並傳遞一個超時時間(0為永不超時),將啟動一個阻塞方式的監聽。 特殊的ID$
表示從最新的ID開始監聽。如:
啟動一個監聽客戶端:
該命令阻塞等待, 此時另起一個客戶端:
等待的客戶端收到訊息:
如果有多個客戶端都在監聽同一個流,這些客戶端都可以得到流中的資料。
消費者組 (Consumer Group)
機制說明
涉及三個命令 分別是
-
XGROUP
: 建立或者銷燬一個 Consumer Group, 也可以從Consumer Group中刪除一個 Consumer -
XREADGROUP
: 指定 Consumer Group 中的一個Consumer,消費一條訊息 -
XACK
: 在XREADGROUP
呼叫時不指定NOACK
時需要顯式呼叫XACK
命令 來確認該訊息已被正確處理,可以刪除。
消費者組的消費方式可以用下圖表示
一個訊息 msg 可通過 group1 和 group2 分發,並且 group1 中的 msg 會被 consumer1 或者 consumer2 消費,group2 中的 msg 會被 consumer3 或者 comsumer4 消費。
建立/消費/確認
使用XGROUP
命令建立一個consumer group,如
這樣就建立了一個名為foocg1的 consumer group, 其中$
表示該組將要消費當前時間開始的訊息,然後我們向Stream中新增一些訊息:
此時,使用foocg1下的c1消費者來消費一條訊息
其中最後的ID欄位 指定為>
, 表示只獲取那些從來沒有被分發的訊息。
我們繼續消費一條訊息
然後,再消費歷史上所有的資料
注意這裡ID傳的是0-0
, 此時會發現消費的是第一條訊息。也就是說,沒有經過XACK的訊息依舊會保留在佇列中。
執行XACK
操作:
此時再去消費歷史資料
發現已經獲取不到被XACK
的訊息了,當所有的歷史資料全部被XACK
後:
一個偽碼錶示的客戶端
一個消費者組的實現的偽碼錶示可以寫作:
XPENDING 和 XCLAIM
XPENDING
可以獲取訊息系統中已經分發但是未被XACK
的訊息的情況
如:
表示有8個未確認訊息,最小ID是"1541573732130-0",最大ID是"1541581755413-0", 其中c1 消費者有8個未確認的訊息。傳遞start, end, count引數可以獲取指定範圍指定數目的未確認訊息的詳細資訊,傳遞consumer可獲取指定consumer下未確認資訊列表。
XCLAIM
可以將未被確認的訊息重新宣告給其他消費者
如下面命令可以獲取到一條原屬於c1的訊息未被確認:
下面命令可以將將原本屬於 消費者c1 的訊息 1541581753377-0 在等待確認的時間>30000情況下重新宣告給c2
此時
XPENDING
和XCLAIM
可以用來處理當一個消費者獲取到一個訊息後,執行失敗導致無法執行XACK
,此時這個訊息就永遠不會進行確認已消費
操作的情形。
其他命令
-
XINFO
可以檢視流的一些資訊 -
XTRIM
可以獲得一個有長度上限的Stream -
XDEL
可以從Stream中刪除訊息
這些命令可以在官網找到詳細的說明,這裡就不再贅述了。
其他說明
-
Stream支援AOF和RDB格式的持久化
-
當呼叫XDEL等造成Stream長度為0時,為了保留可能存在的Consumer Group資訊,Stream不會被刪除。
-
Redis Cluster場景下,由於Key存在於單節點下,所以同一個流的所有訊息也會位於同一個節點下。
-
由於同一個Stream(Key)下的所有訊息位於同一節點,類比Kafka分割槽更像是使用多個Key形成多個Stream來處理本質上是同一類訊息的一個Stream,而不是Stream下的Consumer Group。