寫在前面

我一直以來使用redis的時候,很多低烈度需求(併發要求不是很高)需要用到訊息佇列的時候,在專案本身已經使用了Redis的情況下都想直接用Redis來做訊息佇列,而不想引入新的服務,kafka和RabbitMQ等;

奈何這兄弟一直不給力;

雖然 Redis 的Pub/Sub 是實現了釋出/訂閱的,但這傢伙最坑的是:丟資料

由於Pub/Sub 只是簡單的實現了釋出訂閱模式,簡單的溝通起生產者和消費者,當接收生產者的資料後並立即推送或者說轉發給訂閱消費者,並不會做任何的持久化、儲存操作。由此:

  1. ​ 消費者(客戶端)掉線;
  2. ​ 消費者未訂閱(所以使用的時候一定記得先訂閱再生產);
  3. ​ 服務端宕機;
  4. ​ 消費者消費不過來,訊息堆積(生產資料受資料緩衝區限制);

以上情況都會導致生產資料的丟失,基於上坑,據我所知大家很少使用Pub/Sub ;

不過官方的哨兵叢集通訊的時候就是用的Pub/Sub;

然後,各路大佬結合佇列、阻塞等等實現了各種各樣的方案,主要是使用:BLPOP+LPUSH 的實現

這裡就不一一展開了,有興趣請看葉老闆文章

可能是各種實現都會帶來各種的問題,redis的官方也看到了社群的掙扎。終於,到了Redis5.0,官方帶來了訊息佇列的實現:Stream

Redis Stream介紹

簡單來說Redis Stream 就是想用Redis 做訊息佇列的最佳推薦;

XADD--釋出訊息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 #再發一條
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631628884174-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19
"1631628890025-0"

其中的'*'表示讓 Redis 自動生成唯一的訊息 ID,格式是 「時間戳-自增序號」

XREAD--訂閱訊息

訂閱訊息

XREAD COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREAD COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"

'0-0' 表示從開頭讀取

如果需繼續拉取下一條,需傳入上一條訊息的id

阻塞等待訊息

XREAD COUNT 5 BLOCK 50000 STREAMS stream1 1631628890025-0

阻塞等待訊息id ‘1631628890025-0’ 後的訊息

50000 阻塞時間(毫秒) ‘0’ 表示無限期阻塞

從到這裡就可以看出 Pub/Sub多端訂閱的最大優點,Stream也是支援的。有的同學很快就發現問題了,這裡多端訂閱後,沒有訊息確認ACK機制。

沒錯,因為現在所有的消費者都是訂閱共同的訊息,多端訂閱,如果某個客戶端ACK某條訊息後,其他端消費不了,就實現不了多端消費了。

由此,引出 分組:GROUP

GROUP--訂閱分組訊息(多端訂閱)

同樣先發布訊息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631629080208-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19
"1631629084083-0"

XGROUP CREATE 建立分組

建立分組1

XGROUP CREATE stream1 group1 0-0
127.0.0.1:6379> XGROUP CREATE stream1 group1 0-0
OK

‘0-0’ 表示從開頭讀取

'>' 表示讀取最新,未被消費過的訊息

XREADGROUP--分組讀取

分組 group1

XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >

consumer1 消費者名稱, redis伺服器會記住第一次使用的消費者名稱;


127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
4) 1) "1631629084083-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >
(nil)

同樣

‘0-0’ 表示從開頭讀取

'>' 表示讀取最新,未被消費過的訊息 (可以看到命令執行第二遍已經讀不到新訊息了)

分組 group2

127.0.0.1:6379> XGROUP CREATE stream1 group2 0-0
OK
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 >
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
4) 1) "1631629084083-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19

可以看到可以讀到同樣的訊息,多端訂閱沒有問題;

當然分組也支援阻塞讀取:

#和XREAD一樣
XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0 #分組阻塞
XREADGROUP GROUP group2 consumer1 COUNT 5 BLOCK 0 STREAMS stream1 >

‘0’ 表示無限期阻塞,單位(毫秒)

XPENDING--待處理訊息

訊息使用XREADGROUP 讀取後會進入待處理條目列表(PEL);

我們看看:

 XPENDING stream1 group2
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 4
2) "1631628884174-0"
3) "1631629084083-0"
4) 1) 1) "consumer1"
2) "4"

表示:

  1. (integer) 4 //表示當前消費者組的待處理訊息的數量
  2. "1631628884174-0" //訊息最大id
  3. "1631629084083-0" //最小id
      1. "consumer1" // 消費者名稱
      2. "4" //消費者待處理訊息數量

XACK--刪除已處理訊息(訊息確認機制)

我們已經知道group2待處理訊息有4條,我們從頭讀取看看:

XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
4) 1) "1631629084083-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"

假設最後一條訊息 ‘1631629084083-0’ 我已處理完成

127.0.0.1:6379> XACK stream1 group2 1631629084083-0
(integer) 1

再看:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 3
2) "1631628884174-0"
3) "1631629080208-0"
4) 1) 1) "consumer1"
2) "3"

可以清楚看到goroup2 待處理訊息剩下3條;

這時 Redis 已經把這條訊息標記為「處理完成」不再追蹤;

Stream在Asp.net Core中的使用

private static string _connstr = "172.16.3.119:6379";
private static string _keyStream = "stream1";
private static string _nameGrourp = "group1";
private static string _nameConsumer = "consumer1";

釋出:

csRedis.XAdd(_keyStream, "*", ("name", "message1"));

訂閱:

static async Task CsRedisStreamConsumer()
{
Console.WriteLine("CsRedis StreamConsumer start!"); var csRedis = new CSRedis.CSRedisClient(_connstr);
csRedis.XAdd(_keyStream, "*", ("name", "message1")); try
{
csRedis.XGroupCreate(_keyStream, _nameGrourp);
}
catch { } (string key, (string id, string[] items)[] data)[] product;
(string Pid, string Platform, string Time) data = (null, null, null); while (true)
{
try
{
product = csRedis.XReadGroup(_nameGrourp, _nameConsumer, 1, 10000, (_keyStream, ">"));
if (product?.Length > 0 == true && product[0].data?.Length > 0 == true)
{
Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}"); product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value =>
{
Console.WriteLine($" {value}");
}); //csRedis.XAck(_keyStream, _nameGrourp, product[0].data[0].id);
}
}
catch (Exception)
{
//throw;
}
}
}

CSRedisCore

這裡的超時報錯可通過修改連線引數:syncTimeout 解決

CSRedisCore支援阻塞讀取;

StackExchange.Redis

釋出:

db.StreamAdd(_keyStream, "name", "message1", "*");

訂閱:

static async Task StackExchangeRedisStreamConsumer()
{
Console.WriteLine("StackExchangeRedis StreamConsumer start!"); var redis = ConnectionMultiplexer.Connect(_connstr);
var db = redis.GetDatabase(); try
{
///初始化方式1
//db.StreamAdd(_keyStream, "name", "message1", "*");
//db.StreamCreateConsumerGroup(_keyStream, _nameGrourp); //方式2
db.StreamCreateConsumerGroup(_keyStream, _nameGrourp, StreamPosition.NewMessages);
}
catch { } StreamEntry[] data = null; while (true)
{
data = db.StreamReadGroup(_keyStream, _nameGrourp, _nameConsumer, ">", count: 1, noAck: true); if (data?.Length > 0 == true)
{
Console.WriteLine($"message-id:{data.FirstOrDefault().Id}"); data.FirstOrDefault().Values.ToList().ForEach(c =>
{
Console.WriteLine($" {c.Name}:{c.Value}");
}); db.StreamAcknowledge(_keyStream, _nameGrourp, data.FirstOrDefault().Id);
}
}
}

StackExchange.Redis 有點比較坑的是不存在阻塞讀取;理由:https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing

QA

Q:Stream是否支援AOF、RDB持久化?

A:支援,其它資料型別一樣,每個寫操作,也都會寫入到 RDB 和 AOF 中。

Q:Stream是否還是會丟資料?若是,何種情況下?;

A:會;1、AOF是定時寫盤的,如果資料還在記憶體中時redis服務宕機就會;2、主從切換時(從庫還未同步完成主庫發來的資料,就被提成主庫)

總結

技術中有的時候沒有“銀彈”,只有更適合的技術,汝之蜜糖彼之砒霜;

很多時候的技術選型都是個比較麻煩的東西,對選型人的要求很高;你可能不是隻需要熟悉其中的一種路線,而是要踩過各種各樣的坑,再根據當前受限的環境,選擇比較適合目前需求/團隊的;

回到Stream上,我認為目前Stream能滿足挺大部分佇列需求;

特別是“在專案本身已經使用了Redis的情況下都想直接用Redis來做訊息佇列,而不想引入新的更專業的mq,比如kafka和RabbitMQ的時候”

當然,最終決定需要用更專業的mq與否的,還是需求;

引用

http://www.redis.cn/

https://database.51cto.com/art/202104/659208.htm

https://github.com/2881099/csredis/

https://stackexchange.github.io/StackExchange.Redis/Streams.html