redis —— 延時佇列
我們平時習慣於使用 Rabbitmq 和 Kafka 作為訊息佇列中介軟體,來給應用程式之間增加非同步訊息傳遞功能。這兩個中介軟體都是專業的訊息佇列中介軟體,特性之多超出了大多數人的理解能力。
使用過 Rabbitmq 的同學知道它使用起來有多複雜,發訊息之前要建立 Exchange,再建立 Queue,還要將 Queue 和 Exchange 通過某種規則繫結起來,發訊息的時候要指定 routing-key,還要控制頭部資訊。消費者在消費訊息之前也要進行上面一系列的繁瑣過程。但是絕大多數情況下,雖然我們的訊息佇列只有一組消費者,但還是需要經歷上面這些繁瑣的過程。
有了 Redis,它就可以讓我們解脫出來,對於那些只有一組消費者的訊息佇列,使用 Redis 就可以非常輕鬆的搞定。Redis 的訊息佇列不是專業的訊息佇列,它沒有非常多的高階特性,沒有 ack 保證,如果對訊息的可靠性有著極致的追求,那麼它就不適合使用
非同步訊息佇列
Redis 的 list(列表) 資料結構常用來作為非同步訊息佇列使用,使用rpush/lpush
操作入佇列,使用lpop 和 rpop
來出佇列。
> rpush notify-queue apple banana pear (integer) 3 > llen notify-queue (integer) 3 > lpop notify-queue "apple" > llen notify-queue (integer) 2 > lpop notify-queue "banana" > llen notify-queue (integer) 1 > lpop notify-queue "pear" > llen notify-queue (integer) 0 > lpop notify-queue (nil)
上面是 rpush 和 lpop 結合使用的例子。還可以使用 lpush 和 rpop 結合使用,效果是一樣的。這裡不再贅述。
佇列空了怎麼辦?
客戶端是通過佇列的 pop 操作來獲取訊息,然後進行處理。處理完了再接著獲取訊息,再進行處理。如此迴圈往復,這便是作為佇列消費者的客戶端的生命週期。
可是如果佇列空了,客戶端就會陷入 pop 的死迴圈,不停地 pop,沒有資料,接著再 pop,又沒有資料。這就是浪費生命的空輪詢。空輪詢不但拉高了客戶端的 CPU,redis 的 QPS 也會被拉高,如果這樣空輪詢的客戶端有幾十來個,Redis 的慢查詢可能會顯著增多。
通常我們使用 sleep 來解決這個問題,讓執行緒睡一會,睡個 1s 鍾就可以了。不但客戶端的 CPU 能降下來,Redis 的 QPS 也降下來了。
time.sleep(1) # python 睡 1s
Thread.sleep(1000) # java 睡 1s
佇列延遲
用上面睡眠的辦法可以解決問題。但是有個小問題,那就是睡眠會導致訊息的延遲增大。如果只有 1 個消費者,那麼這個延遲就是 1s。如果有多個消費者,這個延遲會有所下降,因為每個消費者的睡覺時間是岔開來的。
有沒有什麼辦法能顯著降低延遲呢?你當然可以很快想到:那就把睡覺的時間縮短點。這種方式當然可以,不過有沒有更好的解決方案呢?當然也有,那就是 blpop/brpop。
這兩個指令的字首字元b代表的是blocking,也就是阻塞讀。
阻塞讀在佇列沒有資料的時候,會立即進入休眠狀態,一旦資料到來,則立刻醒過來。訊息的延遲幾乎為零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。
空閒連線自動斷開
你以為上面的方案真的很完美麼?先別急著開心,其實他還有個問題需要解決。
什麼問題?—— 空閒連線的問題。
如果執行緒一直阻塞在哪裡,Redis 的客戶端連線就成了閒置連線,閒置過久,伺服器一般會主動斷開連線,減少閒置資源佔用。這個時候blpop/brpop
會丟擲異常來。
所以編寫客戶端消費者的時候要小心,注意捕獲異常,還要重試。
鎖衝突處理
上節課我們講了分散式鎖的問題,但是沒有提到客戶端在處理請求時加鎖沒加成功怎麼辦。一般有 3 種策略來處理加鎖失敗:
- 直接丟擲異常,通知使用者稍後重試;
- sleep 一會再重試;
- 將請求轉移至延時佇列,過一會再試;
直接丟擲特定型別的異常
這種方式比較適合由使用者直接發起的請求,使用者看到錯誤對話方塊後,會先閱讀對話方塊的內容,再點選重試,這樣就可以起到人工延時的效果。如果考慮到使用者體驗,可以由前端的程式碼替代使用者自己來進行延時重試控制。它本質上是對當前請求的放棄,由使用者決定是否重新發起新的請求。
sleep
sleep 會阻塞當前的訊息處理執行緒,會導致佇列的後續訊息處理出現延遲。如果碰撞的比較頻繁或者佇列裡訊息比較多,sleep 可能並不合適。如果因為個別死鎖的 key 導致加鎖不成功,執行緒會徹底堵死,導致後續訊息永遠得不到及時處理。
延時佇列
這種方式比較適合非同步訊息處理,將當前衝突的請求扔到另一個佇列延後處理以避開衝突。
延時佇列的實現
延時佇列可以通過 Redis 的 zset(有序列表) 來實現。我們將訊息序列化成一個字串作為 zset 的value,這個訊息的到期處理時間作為score,然後用多個執行緒輪詢 zset 獲取到期的任務進行處理,多個執行緒是為了保障可用性,萬一掛了一個執行緒還有其它執行緒可以繼續處理。因為有多個執行緒,所以需要考慮併發爭搶任務,確保任務不能被多次執行。
def delay(msg):
msg.id = str(uuid.uuid4()) # 保證 value 值唯一
value = json.dumps(msg)
retry_ts = time.time() + 5 # 5 秒後重試
redis.zadd("delay-queue", retry_ts, value)
def loop():
while True:
# 最多取 1 條
values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)
if not values:
time.sleep(1) # 延時佇列空的,休息 1s
continue
value = values[0] # 拿第一條,也只有一條
success = redis.zrem("delay-queue", value) # 從訊息佇列中移除該訊息
if success: # 因為有多程序併發的可能,最終只會有一個程序可以搶到訊息
msg = json.loads(value)
handle_msg(msg)
Redis 的 zrem 方法是多執行緒多程序爭搶任務的關鍵,它的返回值決定了當前例項有沒有搶到任務,因為 loop 方法可能會被多個執行緒、多個程序呼叫,同一個任務可能會被多個程序執行緒搶到,通過 zrem 來決定唯一的屬主。
同時,我們要注意一定要對 handle_msg 進行異常捕獲,避免因為個別任務處理問題導致迴圈異常退出。以下是 Java 版本的延時佇列實現,因為要使用到 Json 序列化,所以還需要 fastjson 庫的支援。
import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import redis.clients.jedis.Jedis;
public class RedisDelayingQueue<T> {
static class TaskItem<T> {
public String id;
public T msg;
}
// fastjson 序列化物件中存在 generic 型別時,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() {
}.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem<T> task = new TaskItem<T>();
task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
task.msg = msg;
String s = JSON.toJSONString(task); // fastjson 序列化
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時佇列 ,5s 後再試
}
public void loop() {
while (!Thread.interrupted()) {
// 只取一條
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500); // 歇會繼續
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) { // 搶到了
TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
this.handleMsg(task.msg);
}
}
}
public void handleMsg(T msg) {
System.out.println(msg);
}
public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
Thread producer = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}
};
Thread consumer = new Thread() {
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
}
}
}
進一步優化
上面的演算法中同一個任務可能會被多個程序取到之後再使用 zrem 進行爭搶,那些沒搶到的程序都是白取了一次任務,這是浪費。可以考慮使用 lua scripting 來優化一下這個邏輯,將 zrangebyscore 和 zrem 一同挪到伺服器端進行原子化操作,這樣多個程序之間爭搶任務時就不會出現這種浪費了
思考
- Redis 作為訊息佇列為什麼不能保證 100% 的可靠性?
- 使用 Lua Scripting 來優化延時佇列的邏輯
1. Redis如果保證訊息佇列100%可靠性,需要付出更多的成本,比如訊息如何持久化...這會影響其效能。
2. Lua是Redis內建指令碼,執行Lua指令碼時,Redis執行緒會依次執行指令碼中的語句,對於客戶端來說操作是原子性的