Redis Stream
Redis Stream
Redis Stream 是 Redis 5.0 版本新增加的資料結構。
Redis Stream 主要用於訊息佇列(MQ,Message Queue),Redis 本身是有一個 Redis 釋出訂閱 (pub/sub) 來實現訊息佇列的功能,但它有個缺點就是訊息無法持久化,如果出現網路斷開、Redis 宕機等,訊息就會被丟棄。
簡單來說釋出訂閱 (pub/sub) 可以分發訊息,但無法記錄歷史訊息。
而 Redis Stream 提供了訊息的持久化和主備複製功能,可以讓任何客戶端訪問任何時刻的資料,並且能記住每一個客戶端的訪問位置,還能保證訊息不丟失。
Redis Stream 的結構如下所示,它有一個訊息連結串列,將所有加入的訊息都串起來,每個訊息都有一個唯一的 ID 和對應的內容:
每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加訊息時自動建立。
上圖解析:
- Consumer Group :消費組,使用 XGROUP CREATE 命令建立,一個消費組有多個消費者(Consumer)。
- last_delivered_id :遊標,每個消費組會有個遊標 last_delivered_id,任意一個消費者讀取了訊息都會使遊標 last_delivered_id 往前移動。
- pending_ids :消費者(Consumer)的狀態變數,作用是維護消費者的未確認的 id。 pending_ids 記錄了當前已經被客戶端讀取的訊息,但是還沒有 ack (Acknowledge character:確認字元)。
訊息佇列相關命令:
- XADD - 新增訊息到末尾
- XTRIM - 對流進行修剪,限制長度
- XDEL - 刪除訊息
- XLEN - 獲取流包含的元素數量,即訊息長度
- XRANGE - 獲取訊息列表,會自動過濾已經刪除的訊息
- XREVRANGE - 反向獲取訊息列表,ID 從大到小
- XREAD - 以阻塞或非阻塞方式獲取訊息列表
消費者組相關命令:
- XGROUP CREATE - 建立消費者組
- XREADGROUP GROUP - 讀取消費者組中的訊息
- XACK - 將訊息標記為"已處理"
- XGROUP SETID - 為消費者組設定新的最後遞送訊息ID
- XGROUP DELCONSUMER - 刪除消費者
- XGROUP DESTROY - 刪除消費者組
- XPENDING - 顯示待處理訊息的相關資訊
- XCLAIM - 轉移訊息的歸屬權
- XINFO - 檢視流和消費者組的相關資訊;
- XINFO GROUPS - 列印消費者組的資訊;
- XINFO STREAM - 列印流資訊
XADD
使用 XADD 向佇列新增訊息,如果指定的佇列不存在,則建立一個佇列,XADD 語法格式:
XADD key ID field value [field value ...]
- key :佇列名稱,如果不存在就建立
- ID :訊息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性。
- field value : 記錄。
例項
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
2) 1) "name"
2) "Sara"
3) "surname"
4) "OConnor"
2) 1) "1601372323627-1"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
5) "field3"
6) "value3"
redis>
XTRIM
使用 XTRIM 對流進行修剪,限制長度, 語法格式:
XTRIM key MAXLEN [~] count
- key :佇列名稱
- MAXLEN :長度
- count :數量
例項
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
2) 1) "field1"
2) "A"
3) "field2"
4) "B"
5) "field3"
6) "C"
7) "field4"
8) "D"
127.0.0.1:6379>
redis>
XDEL
使用 XDEL 刪除訊息,語法格式:
XDEL key ID [ID ...]
- key:佇列名稱
- ID :訊息 ID
例項
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
2) 1) "a"
2) "1"
2) 1) 1538561701744-0
2) 1) "c"
2) "3"
XLEN
使用 XLEN 獲取流包含的元素數量,即訊息長度,語法格式:
XLEN key
- key:佇列名稱
例項
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>
XRANGE
使用 XRANGE 獲取訊息列表,會自動過濾已經刪除的訊息 ,語法格式:
XRANGE key start end [COUNT count]
- key :佇列名
- start :開始值, - 表示最小值
- end :結束值, + 表示最大值
- count :數量
例項
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) "1601372577811-1"
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
redis>
XREVRANGE
使用 XREVRANGE 獲取訊息列表,會自動過濾已經刪除的訊息 ,語法格式:
XREVRANGE key end start [COUNT count]
- key :佇列名
- end :結束值, + 表示最大值
- start :開始值, - 表示最小值
- count :數量
例項
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"
2) 1) "name"
2) "Ngozi"
3) "surname"
4) "Adichie"
redis>
XREAD
使用 XREAD 以阻塞或非阻塞方式獲取訊息列表 ,語法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count :數量
- milliseconds :可選,阻塞毫秒數,沒有設定就是非阻塞模式
- key :佇列名
- id :訊息 ID
例項
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
3) "event-id"
4) "5"
5) "user-id"
6) "7782813"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
3) "event-id"
4) "9"
5) "user-id"
6) "388234"
2) 1) "writers"
2) 1) 1) 1526985676425-0
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) 1526985685298-0
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
XGROUP CREATE
使用 XGROUP CREATE 建立消費者組,語法格式:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
- key :佇列名稱,如果不存在就建立
- groupname :組名。
- $ : 表示從尾部開始消費,只接受新訊息,當前 Stream 訊息會全部忽略。
從頭開始消費:
XGROUP CREATE mystream consumer-group-name 0-0
從尾部開始消費:
XGROUP CREATE mystream consumer-group-name $
XREADGROUP GROUP
使用 XREADGROUP GROUP 讀取消費組中的訊息,語法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group :消費組名
- consumer :消費者名。
- count : 讀取數量。
- milliseconds : 阻塞毫秒數。
- key : 佇列名。
- ID : 訊息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >