痛點背景
業務場景
假設有這麼一個需求,使用者下單後如果30分鐘未支付,則該訂單需要被關閉。你會怎麼做?
之前方案
最簡單的做法,可以服務端啟動個定時器,隔個幾秒掃描資料庫中待支付的訂單,如果(當前時間-訂單建立時間)>30分鐘,則關閉訂單。
方案評估
優點:是實現簡單,缺點呢?
缺點:定時掃描意味著隔個幾秒就得查一次資料庫,頻率高的情況下,如果資料庫中訂單總量特別大,這種高頻掃描會對資料庫帶來一定壓力,待付款訂單特別多時(做個爆品秒殺活動,或者啥促銷活動),若一次性查到記憶體中,容易引起宕機,需要分頁查詢,多少也會有一定資料庫層面壓力。
延時隊列出現
能夠在指定時間間隔後觸發某個業務操作
能夠應對業務資料量特別大的特殊場景
RocketMQ延時訊息能夠完美的解決上述需求,正常的訊息在投遞後會立馬被消費者所消費,而延時訊息在投遞時,需要設定指定的延時級別(不同延遲級別對應不同延遲時間),即等到特定的時間間隔後訊息才會被消費者消費,這樣就將資料庫層面的壓力轉移到了MQ中,也不需要手寫定時器,降低了業務複雜度,同時MQ自帶削峰功能,能夠很好的應對業務高峰。
功能特點
RocketMQ支援傳送延遲訊息,但不支援任意時間的延遲訊息的設定,僅支援內建預設值的延遲時間間隔的延遲訊息;
預設值的延遲時間間隔為:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
在訊息建立的時候,呼叫 setDelayTimeLevel(int level) 方法設定延遲時間;
broker在接收到延遲訊息的時候會把對應延遲級別的訊息先儲存到對應的延遲佇列中,等延遲訊息時間到達時,會把訊息重新儲存到對應的topic的queue裡面。
Broker處理延遲訊息
延時佇列生產者端:
延時訊息的關鍵點在於Producer生產者需要給訊息設定特定延時級別,消費端程式碼與正常消費者沒有差別。
public class Producer {
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//設定namesrv地址
producer.setNamesrvAddr("111.231.110.149:9876");
//啟動生產者
producer.start();
//傳送10條訊息
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//設定訊息延時級別 3對應10秒後傳送
//延時級別1對應延時1秒後傳送訊息
//延時級別2對應延時5秒後傳送訊息
//延時級別3對應延時10秒後傳送訊息
//以此類推。
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}
初始化
DefaultMessageStore在啟動時,會呼叫ScheduleMessageService#load()方法來載入訊息消費進度和初始化延遲級別對應map,然後呼叫ScheduleMessageService#start()方法來啟動類
load方法
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
return result;
}
ScheduleMessageService繼承自ConfigManager類,super.load()方法對應
public boolean load() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);
if (null == jsonString || jsonString.length() == 0) {
return this.loadBak();
} else {
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load " + fileName + " failed, and try to load backup file", e);
return this.loadBak();
}
}
延時佇列原始碼分析:
先從延時訊息延遲級別設定與broker端訊息持久化入手。
具體實現
RocketMQ傳送延時訊息時先把訊息按照延遲時間段傳送到指定的佇列中(rocketmq把每種延遲時間段的訊息都存放到同一個佇列中)然後通過一個定時器進行輪訓這些佇列,檢視訊息是否到期,如果到期就把這個訊息傳送到指定topic的佇列中,這樣的好處是同一佇列中的訊息延時時間是一致的,還有一個好處是這個佇列中的訊息時按照訊息到期時間進行遞增排序的,說的簡單直白就是佇列中訊息越靠前的到期時間越早。
啟動延遲訊息定時任務
如果想要深入瞭解的可以看一下ScheduleMessageService這個類
內部變數含義
延時訊息定時投遞相關具體實現程式碼在ScheduleMessageService中,先看下變數定義
delayLevelTable定義了延遲級別和延遲時間的對應關係
offsetTable存放延延遲級別對應的佇列消費的offset
ScheduleMessageService.start()
延遲訊息投遞
其中根據,delayLevel獲取消費佇列id的方法如下,即queueId = delayLevel-1
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}
核心邏輯就是取出tagCode(延時訊息持久化時,tagsCode儲存的是訊息投遞時間),解析成訊息投遞時間,與當前時間戳做差,判斷是否應該進行訊息投遞,具體進行訊息投遞的方法,在if (countdown <= 0)中,看下程式碼
每個掃描任務主要是把佇列中所有到期的訊息都拿出來,併發送到指定的topic下,並把延遲佇列中的訊息刪除
重新投遞實現
重新構建投遞訊息的關鍵點在於messageTimeup中,其構建了一個新的訊息,並從延時訊息屬性中恢復出了原有的topic,queueId,再呼叫putMessage重新進行投遞。
總結
優點:設計簡單,把所有相同延遲時間的訊息都先放到一個佇列中,定時掃描,可以保證訊息消費的有序性
缺點:定時器採用了timer,timer是單執行緒執行,如果延遲訊息數量很大的情況下,可能單執行緒處理不過來,造成訊息到期後也沒有傳送出去的情況
改進點:可以在每個延遲佇列上各採用一個timer,或者使用timer進行掃描,加一個執行緒池對訊息進行處理,這樣可以提供效率
基本思路已經介紹完,梳理下延時訊息實現思路
- producer端設定訊息delayLevel延遲級別,訊息屬性DELAY中儲存了對應了延時級別
- broker端收到訊息後,判斷延時訊息延遲級別,如果大於0,則備份訊息原始topic,queueId,並將訊息topic改為延時訊息佇列特定topic(SCHEDULE_TOPIC),queueId改為延時級別-1
- mq服務端ScheduleMessageService中,為每一個延遲級別單獨設定一個定時器,定時(每隔1秒)拉取對應延遲級別的消費佇列
- 根據消費偏移量offset從commitLog中解析出對應訊息
- 從訊息tagsCode中解析出訊息應當被投遞的時間,與當前時間做比較,判斷是否應該進行投遞
- 若到達了投遞時間,則構建一個新的訊息,並從訊息屬性中恢復出原始的topic,queueId,並清除訊息延遲屬性,從新進行訊息投遞