1. 程式人生 > >如何實現延遲佇列

如何實現延遲佇列

延遲佇列的需求各位應該在日常開發的場景中經常碰到。比如:

使用者登入之後5分鐘給使用者做分類推送;

使用者多少天未登入給使用者做召回推送;

定期檢查使用者當前退款賬單是否被商家處理等等場景。

一般這種場景和定時任務還是有很大的區別,定時任務是你知道任務多久該跑一次或者什麼時候只跑一次,這個時間是確定的。延遲佇列是當某個事件發生的時候需要延遲多久觸發配套事件,引子事件發生的時間不是固定的。

業界目前也有很多實現方案,單機版的方案就不說了,現在也沒有哪個公司還是單機版的服務,今天我們一一探討各種方案的大致實現。

1. Redis zset

這個方案比較常用,簡單有效。利用 Redis 的 sorted set 結構,使用 timeStamp 作為 score,比如你的任務是要延遲5分鐘,那麼就在當前時間上加5分鐘作為 score ,輪詢任務每秒只輪詢 score 大於當前時間的 key即可,如果任務支援有誤差,那麼當沒有掃描到有效資料的時候可以休眠對應時間再繼續輪詢。

方案優劣:

優點:

簡單實用,一針見血。

缺點:

  1. 單個 zset 肯定支援不了太大的資料量,如果你有幾百萬的延遲任務需求,大哥我還是勸你換一個方案;
  2. 定時器輪詢方案可能會有異常終止的情況需要自己處理,同時訊息處理失敗的回滾方案,您也要自己處理。

所以,sorted set 的方案並不是一個成熟的方案,他只是一個快速可供落地的方案。

2. RabbitMQ佇列

下面說一個可以落地的方案,這個方案也被大多數目前在架構中使用了 RabbitMQ 的專案組使用。不好的一點就是,捆綁 RabbitMQ,當你的架構方案是要用別的 MQ 替換 RabbitMQ 的時候,你就蛋疼了(我現在正在經歷)。

RabbitMQ 有兩個特性,一個是 Time-To-Live Extensions

,另一個是 Dead Letter Exchanges

  • Time-To-Live Extensions

    RabbitMQ允許我們為訊息或者佇列設定TTL(time to live),也就是過期時間。TTL表明了一條訊息可在佇列中存活的最大時間,單位為毫秒。也就是說,當某條訊息被設定了TTL或者當某條訊息進入了設定了TTL的佇列時,這條訊息會在經過TTL秒後 “死亡”,成為Dead Letter。如果既配置了訊息的TTL,又配置了佇列的TTL,那麼較小的那個值會被取用。

  • Dead Letter Exchanges

    在 RabbitMQ 中,一共有三種訊息的 “死亡” 形式:

    1. 訊息被拒絕。通過呼叫 basic.reject
      或者 basic.nack 並且設定的 requeue 引數為 false;
    2. 訊息因為設定了TTL而過期;
    3. 佇列達到最大長度。

DLX同一般的 Exchange 沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。當佇列中有 DLX 訊息時,RabbitMQ就會自動的將 DLX 訊息重新發布到設定的 Exchange 中去,進而被路由到另一個佇列,publish 可以監聽這個佇列中訊息做相應的處理。

由上簡介大家可以看出,RabbitMQ本身是不支援延遲佇列的,只是他的特性讓勤勞的 中國脫髮群體 急中生智(為了完成任務)弄出了這麼一套可用的方案。

可用的方案就是:

  1. 如果有事件需要延遲那麼將該事件傳送到MQ 佇列中,為需要延遲的訊息設定一個TTL;
  2. TTL到期後就會自動進入設定好的DLX,然後由DLX轉發到配置好的實際消費佇列;
  3. 消費該佇列的延遲訊息,處理事件。

方案優劣:

優點:

大品牌元件,用的放心。如果面臨大資料量需求可以很容易的橫向擴充套件,同時訊息支援持久化,有問題可回滾。

缺點:

  1. 配置麻煩,額外增加一個死信交換機和一個死信佇列的配置;
  2. RabbitMQ 是一個訊息中介軟體,TTL 和 DLX 只是他的一個特性,將延遲佇列繫結在一個功能軟體的某一個特性上,可能會有風險。不要槓,當你們組不用 RabbitMQ 的時候遷移很痛苦;
  3. 訊息佇列具有先進先出的特點,如果第一個進入佇列的訊息 A 的延遲是10分鐘,第二個進入佇列的訊息B 的延遲是5分鐘,期望的是誰先到 TTL誰先出,但是事實是B已經到期了,而還要等到 A 的延遲10分鐘結束A先出之後,B 才能出。所以在設計的時候需要考慮不同延遲的訊息要放到不同的佇列。另外該問題官方已經給出了外掛來支援:外掛地址。

3. 基於 Netty#HashedWheelTimer類方法的實現

HashedWheelTimer 是 Netty 中 的一個基礎工具類,主要用來高效處理大量定時任務,且任務對時間精度要求相對不高, 在Netty 中的應用場景就是連線超時或者任務處理超時,一般都是操作比較快速的任務,缺點是記憶體佔用相對較高。

演算法思想

HashedWheelTimer 主要還是一個 DelayQueue 和一個時間輪演算法組合。

Hash Wheel Timer是一個環形結構,可以想象成時鐘,分為很多格子,一個格子代表一段時間(越短Timer精度越高),並用一個List儲存在該格子上到期的所有任務。同時一個指標隨著時間流逝一格一格轉動,並執行對應List中所有到期的任務。

以上圖為例,假設一個格子是1s,則整個時間輪能表示的時間段16s。當前任務指向格子2,表明在第2s的時候有任務需要執行。任務列表中有兩個任務,每個任務前面的數字表示圈數。2表示當走到第2圈的時候才會執行,那麼整個任務的真正執行時間其實是在12s之後執行,即第二圈走到2的時候。每推進一格,對應的每一個 slot 中的round數都要減一。整體演算法就是這麼個邏輯。

時間輪設計要點:

  • tick,一次時間推進,每次推進會檢查/執行超時任務;
  • tickDuration,時間輪推進的最小單元,每隔 tickDuration 會有一次 tick,它決定了時間輪的精確程度;
  • ucket(ticksPerWheel),上圖中的每一隔就是一個bucket,表示一個時間輪可以有多少個tick,它是儲存任務的最小單元;
  • 上層時間輪的 tickDuration 是下層時間輪的表示時間的最大範圍,即:父 tickDuration = 子 tickDuration * 子 bucket 。

需要注意的是,這種方式任務是序列執行的。意味著你如果在時間輪中執行任務且任務耗時較長,將會出現排程超時或者任務堆積的情況。所以要將任務的執行非同步化。

演算法的要點:

  1. 任務並不是直接放在格子中的,而是維護了一個雙向連結串列,這種資料結構非常便於插入和移除;
  2. 新新增的任務並不直接放入格子,而是先放入一個佇列中,這是為了避免多執行緒插入任務的衝突。在每個tick執行任務之前由worker執行緒自動對任務進行歸集和分類,插入到對應的槽位裡面。

Netty 使用陣列 + 雙向連結串列的方式來組織時間輪,對於新增/取消操作僅做了記錄,真正的操作實際發生在下一個tick。時間的推進是獨立的執行緒在做,該執行緒同時也負責過期任務的執行等操作,可簡單認為此步驟操作為O(n),因為推進執行緒需要完全遍歷timeoutscancelledTimeoutsbucket連結串列,在遍歷timeouts時,Netty為了避免任務過多,所以限制每次最多遍歷10萬個,也就是說,一個tick只能規劃10萬個任務,當任務量過大時,會存在超時任務執行時間延遲的現象。

方案優劣:

優點:

實現比較優雅。效率高。

缺點:

  1. 無法實現HA和橫向擴充套件,要麼就使用多個時間輪。
  2. 最重要的是,實現也比較複雜,開發者需要考慮所有可能的情況。

目前我瞭解到的延遲佇列在生產環境下有如上三種實現方式,每一種都有人在使用。當然沒有最好的只有最適合的,你覺得 redis 能滿足需求,就按照最簡單的來,你要是有充足的開發週期,你也可以實現時間輪展現實力。

需求千萬種,變化就一種:給時間都能做。