1. 程式人生 > >redis —— 延時佇列

redis —— 延時佇列

我們平時習慣於使用 Rabbitmq 和 Kafka 作為訊息佇列中介軟體,來給應用程式之間增加非同步訊息傳遞功能。這兩個中介軟體都是專業的訊息佇列中介軟體,特性之多超出了大多數人的理解能力。

使用過 Rabbitmq 的同學知道它使用起來有多複雜,發訊息之前要建立 Exchange,再建立 Queue,還要將 Queue 和 Exchange 通過某種規則繫結起來,發訊息的時候要指定 routing-key,還要控制頭部資訊。消費者在消費訊息之前也要進行上面一系列的繁瑣過程。但是絕大多數情況下,雖然我們的訊息佇列只有一組消費者,但還是需要經歷上面這些繁瑣的過程。

有了 Redis,它就可以讓我們解脫出來,對於那些只有一組消費者的訊息佇列,使用 Redis 就可以非常輕鬆的搞定。Redis 的訊息佇列不是專業的訊息佇列,它沒有非常多的高階特性,沒有 ack 保證,如果對訊息的可靠性有著極致的追求,那麼它就不適合使用

非同步訊息佇列

Redis 的 list(列表) 資料結構常用來作為非同步訊息佇列使用,使用rpush/lpush操作入佇列,使用lpop 和 rpop來出佇列。

> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)

 

上面是 rpush 和 lpop 結合使用的例子。還可以使用 lpush 和 rpop 結合使用,效果是一樣的。這裡不再贅述。

佇列空了怎麼辦?

客戶端是通過佇列的 pop 操作來獲取訊息,然後進行處理。處理完了再接著獲取訊息,再進行處理。如此迴圈往復,這便是作為佇列消費者的客戶端的生命週期。

可是如果佇列空了,客戶端就會陷入 pop 的死迴圈,不停地 pop,沒有資料,接著再 pop,又沒有資料。這就是浪費生命的空輪詢。空輪詢不但拉高了客戶端的 CPU,redis 的 QPS 也會被拉高,如果這樣空輪詢的客戶端有幾十來個,Redis 的慢查詢可能會顯著增多。

通常我們使用 sleep 來解決這個問題,讓執行緒睡一會,睡個 1s 鍾就可以了。不但客戶端的 CPU 能降下來,Redis 的 QPS 也降下來了。

time.sleep(1)  # python 睡 1s
Thread.sleep(1000)  # java 睡 1s

佇列延遲

用上面睡眠的辦法可以解決問題。但是有個小問題,那就是睡眠會導致訊息的延遲增大。如果只有 1 個消費者,那麼這個延遲就是 1s。如果有多個消費者,這個延遲會有所下降,因為每個消費者的睡覺時間是岔開來的。

有沒有什麼辦法能顯著降低延遲呢?你當然可以很快想到:那就把睡覺的時間縮短點。這種方式當然可以,不過有沒有更好的解決方案呢?當然也有,那就是 blpop/brpop。

這兩個指令的字首字元b代表的是blocking,也就是阻塞讀。

阻塞讀在佇列沒有資料的時候,會立即進入休眠狀態,一旦資料到來,則立刻醒過來。訊息的延遲幾乎為零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。

空閒連線自動斷開

你以為上面的方案真的很完美麼?先別急著開心,其實他還有個問題需要解決。

什麼問題?—— 空閒連線的問題。

如果執行緒一直阻塞在哪裡,Redis 的客戶端連線就成了閒置連線,閒置過久,伺服器一般會主動斷開連線,減少閒置資源佔用。這個時候blpop/brpop會丟擲異常來。

所以編寫客戶端消費者的時候要小心,注意捕獲異常,還要重試。

鎖衝突處理

上節課我們講了分散式鎖的問題,但是沒有提到客戶端在處理請求時加鎖沒加成功怎麼辦。一般有 3 種策略來處理加鎖失敗:

  1. 直接丟擲異常,通知使用者稍後重試;
  2. sleep 一會再重試;
  3. 將請求轉移至延時佇列,過一會再試;

直接丟擲特定型別的異常

這種方式比較適合由使用者直接發起的請求,使用者看到錯誤對話方塊後,會先閱讀對話方塊的內容,再點選重試,這樣就可以起到人工延時的效果。如果考慮到使用者體驗,可以由前端的程式碼替代使用者自己來進行延時重試控制。它本質上是對當前請求的放棄,由使用者決定是否重新發起新的請求。

sleep

sleep 會阻塞當前的訊息處理執行緒,會導致佇列的後續訊息處理出現延遲。如果碰撞的比較頻繁或者佇列裡訊息比較多,sleep 可能並不合適。如果因為個別死鎖的 key 導致加鎖不成功,執行緒會徹底堵死,導致後續訊息永遠得不到及時處理。

延時佇列

這種方式比較適合非同步訊息處理,將當前衝突的請求扔到另一個佇列延後處理以避開衝突。

延時佇列的實現

延時佇列可以通過 Redis 的 zset(有序列表) 來實現。我們將訊息序列化成一個字串作為 zset 的value,這個訊息的到期處理時間作為score,然後用多個執行緒輪詢 zset 獲取到期的任務進行處理,多個執行緒是為了保障可用性,萬一掛了一個執行緒還有其它執行緒可以繼續處理。因為有多個執行緒,所以需要考慮併發爭搶任務,確保任務不能被多次執行。

def delay(msg):
    msg.id = str(uuid.uuid4())  # 保證 value 值唯一
    value = json.dumps(msg)
    retry_ts = time.time() + 5  # 5 秒後重試
    redis.zadd("delay-queue", retry_ts, value)


def loop():
    while True:
        # 最多取 1 條
        values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)
        if not values:
            time.sleep(1)  # 延時佇列空的,休息 1s
            continue
        value = values[0]  # 拿第一條,也只有一條
        success = redis.zrem("delay-queue", value)  # 從訊息佇列中移除該訊息
        if success:  # 因為有多程序併發的可能,最終只會有一個程序可以搶到訊息
            msg = json.loads(value)
            handle_msg(msg)

Redis 的 zrem 方法是多執行緒多程序爭搶任務的關鍵,它的返回值決定了當前例項有沒有搶到任務,因為 loop 方法可能會被多個執行緒、多個程序呼叫,同一個任務可能會被多個程序執行緒搶到,通過 zrem 來決定唯一的屬主。

同時,我們要注意一定要對 handle_msg 進行異常捕獲,避免因為個別任務處理問題導致迴圈異常退出。以下是 Java 版本的延時佇列實現,因為要使用到 Json 序列化,所以還需要 fastjson 庫的支援。

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {

  static class TaskItem<T> {
    public String id;
    public T msg;
  }

  // fastjson 序列化物件中存在 generic 型別時,需要使用 TypeReference
  private Type TaskType = new TypeReference<TaskItem<T>>() {
  }.getType();

  private Jedis jedis;
  private String queueKey;

  public RedisDelayingQueue(Jedis jedis, String queueKey) {
    this.jedis = jedis;
    this.queueKey = queueKey;
  }

  public void delay(T msg) {
    TaskItem<T> task = new TaskItem<T>();
    task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
    task.msg = msg;
    String s = JSON.toJSONString(task); // fastjson 序列化
    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時佇列 ,5s 後再試
  }

  
  public void loop() {
    while (!Thread.interrupted()) {
      // 只取一條
      Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
      if (values.isEmpty()) {
        try {
          Thread.sleep(500); // 歇會繼續
        } catch (InterruptedException e) {
          break;
        }
        continue;
      }
      String s = values.iterator().next();
      if (jedis.zrem(queueKey, s) > 0) { // 搶到了
        TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
        this.handleMsg(task.msg);
      }
    }
  }

  public void handleMsg(T msg) {
    System.out.println(msg);
  }

  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
    Thread producer = new Thread() {

      public void run() {
        for (int i = 0; i < 10; i++) {
          queue.delay("codehole" + i);
        }
      }

    };
    Thread consumer = new Thread() {

      public void run() {
        queue.loop();
      }

    };
    producer.start();
    consumer.start();
    try {
      producer.join();
      Thread.sleep(6000);
      consumer.interrupt();
      consumer.join();
    } catch (InterruptedException e) {
    }
  }
}

進一步優化

上面的演算法中同一個任務可能會被多個程序取到之後再使用 zrem 進行爭搶,那些沒搶到的程序都是白取了一次任務,這是浪費。可以考慮使用 lua scripting 來優化一下這個邏輯,將 zrangebyscore 和 zrem 一同挪到伺服器端進行原子化操作,這樣多個程序之間爭搶任務時就不會出現這種浪費了

思考

  1. Redis 作為訊息佇列為什麼不能保證 100% 的可靠性?
  2. 使用 Lua Scripting 來優化延時佇列的邏輯

1. Redis如果保證訊息佇列100%可靠性,需要付出更多的成本,比如訊息如何持久化...這會影響其效能。

2. Lua是Redis內建指令碼,執行Lua指令碼時,Redis執行緒會依次執行指令碼中的語句,對於客戶端來說操作是原子性的