1. 程式人生 > >RocketMQ延遲訊息的程式碼實戰及原理分析

RocketMQ延遲訊息的程式碼實戰及原理分析

### RocketMQ簡介 RocketMQ是一款開源的分散式訊息系統,基於高可用分散式叢集技術,提供低延時的、高可靠、萬億級容量、靈活可伸縮的訊息釋出與訂閱服務。 它前身是**MetaQ**,是阿里基於Kafka的設計使用Java進行自主研發的。在2012年,阿里將其開源, 在2016年,阿里將其捐獻給Apache軟體基金會(Apache Software Foundation,簡稱為ASF),正式成為孵化專案。2017 年,Apache軟體基金會宣佈RocketMQ已孵化成為 Apache頂級專案(Top Level Project,簡稱為TLP ),是國內首個網際網路中介軟體在 Apache上的頂級專案。 ### 延遲訊息 生產者把訊息傳送到訊息佇列中以後,並不期望被立即消費,而是等待指定時間後才可以被消費者消費,這類訊息通常被稱為**延遲訊息**。 在RocketMQ中,支援延遲訊息,但是不支援任意時間精度的延遲訊息,只支援特定級別的延遲訊息。如果要支援任意時間精度,不能避免在Broker層面做訊息排序,再涉及到持久化的考量,那麼訊息排序就不可避免產生巨大的效能開銷。 訊息延遲級別分別為**1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h**,共18個級別。在傳送訊息時,設定訊息延遲級別即可,設定訊息延遲級別時有以下3種情況: 1. 設定訊息延遲級別等於0時,則該訊息為非延遲訊息。 2. 設定訊息延遲級別大於等於1並且小於等於18時,訊息延遲特定時間,如:設定訊息延遲級別等於1,則延遲1s;設定訊息延遲級別等於2,則延遲5s,以此類推。 3. 設定訊息延遲級別大於18時,則該訊息延遲級別為18,如:設定訊息延遲級別等於20,則延遲2h。 文章持續更新,微信搜尋「萬貓學社第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。 ### 延遲訊息示例 首先,寫一個消費者,用於消費延遲訊息: ```java public class Consumer { public static void main(String[] args) throws MQClientException { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 例項化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup"); // 設定NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的訊息 consumer.subscribe("OneMoreTopic", "*"); // 註冊回撥實現類來處理從broker拉取回來的訊息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) ->
{ System.out.printf("%s %s Receive New Messages:%n" , sdf.format(new Date()) , Thread.currentThread().getName()); for (MessageExt msg : msgs) { System.out.printf("\tMsg Id: %s%n", msg.getMsgId()); System.out.printf("\tBody: %s%n", new String(msg.getBody())); } // 標記該訊息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 啟動消費者例項 consumer.start(); System.out.println("Consumer Started."); } } ``` 再寫一個延遲訊息的生產者,用於傳送延遲訊息: ```java public class DelayProducer { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 例項化訊息生產者Producer DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup"); // 設定NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 啟動Producer例項 producer.start(); Message msg = new Message("OneMoreTopic" , "DelayMessage", "This is a delay message.".getBytes()); //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" //設定訊息延遲級別為3,也就是延遲10s。 msg.setDelayTimeLevel(3); // 傳送訊息到一個Broker SendResult sendResult = producer.send(msg); // 通過sendResult返回訊息是否成功送達 System.out.printf("%s Send Status: %s, Msg Id: %s %n" , sdf.format(new Date()) , sendResult.getSendStatus() , sendResult.getMsgId()); // 如果不再發送訊息,關閉Producer例項。 producer.shutdown(); } } ``` 執行生產者以後,就會發送一條延遲訊息: ``` 10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000 ``` 10秒鐘後,消費者收到的這條延遲訊息: ``` 10:37:25.026 ConsumeMessageThread_1 Receive New Messages: Msg Id: C0A8006D5AB018B4AAC216E0DB690000 Body: This is a delay message. ``` 文章持續更新,微信搜尋「萬貓學社
第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。 ### 延遲訊息的原理分析 >以下分析的RocketMQ原始碼的版本號是4.7.1,版本不同原始碼略有差別。 #### CommitLog 在**org.apache.rocketmq.store.CommitLog**中,針對延遲訊息做了一些處理: ```java // 延遲級別大於0,就是延時訊息 if (msg.getDelayTimeLevel() > 0) { // 判斷當前延遲級別,如果大於最大延遲級別, // 就設定當前延遲級別為最大延遲級別。 if (msg.getDelayTimeLevel() >
this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()); } // 獲取延遲訊息的主題, // 其中RMQ_SYS_SCHEDULE_TOPIC的值為SCHEDULE_TOPIC_XXXX topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 根據延遲級別獲取延遲訊息的佇列Id, // 佇列Id其實就是延遲級別減1 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 備份真正的主題和佇列Id MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 設定延時訊息的主題和佇列Id msg.setTopic(topic); msg.setQueueId(queueId); } ``` 可以看到,每一個延遲訊息的主題都被暫時更改為**SCHEDULE_TOPIC_XXXX**,並且根據延遲級別延遲訊息變更了新的佇列Id。接下來,處理延遲訊息的就是**org.apache.rocketmq.store.schedule.ScheduleMessageService**。 #### ScheduleMessageService ScheduleMessageService是由**org.apache.rocketmq.store.DefaultMessageStore**進行初始化的,初始化包括構造物件和呼叫`load`方法。最後,再執行ScheduleMessageService的`start`方法: ```java public void start() { // 使用AtomicBoolean確保start方法僅有效執行一次 if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); // 遍歷所有延遲級別 for (Map.Entry entry : this.delayLevelTable.entrySet()) { // key為延遲級別 Integer level = entry.getKey(); // value為延遲級別對應的毫秒數 Long timeDelay = entry.getValue(); // 根據延遲級別獲得對應佇列的偏移量 Long offset = this.offsetTable.get(level); // 如果偏移量為null,則設定為0 if (null == offset) { offset = 0L; } if (timeDelay != null) { // 為每個延遲級別建立定時任務, // 第一次啟動任務延遲為FIRST_DELAY_TIME,也就是1秒 this.timer.schedule( new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 延遲10秒後每隔flushDelayOffsetInterval執行一次任務, // 其中,flushDelayOffsetInterval預設配置也為10秒 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 持久化每個佇列消費的偏移量 if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore .getMessageStoreConfig().getFlushDelayOffsetInterval()); } } ``` 遍歷所有延遲級別,根據延遲級別獲得對應佇列的偏移量,如果偏移量不存在,則設定為0。然後為每個延遲級別建立定時任務,第一次啟動任務延遲為1秒,第二次及以後的啟動任務延遲才是延遲級別相應的延遲時間。 然後,又建立了一個定時任務,用於持久化每個佇列消費的偏移量。持久化的頻率由**flushDelayOffsetInterval**屬性進行配置,預設為10秒。 文章持續更新,微信搜尋「萬貓學社第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。 #### 定時任務 ScheduleMessageService的`start`方法執行之後,每個延遲級別都建立自己的定時任務,這裡的定時任務的具體實現就在**DeliverDelayedMessageTimerTask**類之中,它核心程式碼是**executeOnTimeup**方法之中,我們來看一下主要部分: ```java // 根據主題和佇列Id獲取訊息佇列 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue( TopicValidator.RMQ_SYS_SCHEDULE_TOPIC , delayLevel2QueueId(delayLevel)); ``` 如果沒有獲取到對應的訊息佇列,則在**DELAY_FOR_A_WHILE**(預設為100)毫秒後再執行任務。如果獲取到了,就繼續執行下面操作: ```java // 根據消費偏移量從訊息佇列中獲取所有有效訊息 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); ``` 如果沒有獲取到有效訊息,則在**DELAY_FOR_A_WHILE**(預設為100)毫秒後再執行任務。如果獲取到了,就繼續執行下面操作: ```java // 遍歷所有訊息 for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 獲取訊息的物理偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); // 獲取訊息的物理長度 int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); // 省略部分程式碼... long now = System.currentTimeMillis(); // 計算訊息應該被消費的時間 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); // 計算下一條訊息的偏移量 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE) long countdown = deliverTimestamp - now; // 省略部分程式碼... } ``` 如果當前訊息不到消費的時間,則在`countdown`毫秒後再執行任務。如果到消費的時間,就繼續執行下面操作: ```java // 根據訊息的物理偏移量和大小獲取訊息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); ``` 如果獲取到訊息,則繼續執行下面操作: ```java // 重新構建新的訊息,包括: // 1.清除訊息的延遲級別 // 2.恢復真正的訊息主題和佇列Id MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}," + " discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; } // 重新把訊息傳送到真正的訊息佇列上 PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); ``` 清除了訊息的延遲級別,並且恢復了真正的訊息主題和佇列Id,重新把訊息傳送到真正的訊息佇列上以後,消費者就可以立即消費了。 文章持續更新,微信搜尋「萬貓學社第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。 #### 總結 經過以上對原始碼的分析,可以總結出延遲訊息的實現步驟: 1. 如果訊息的延遲級別大於0,則表示該訊息為延遲訊息,修改該訊息的主題為**SCHEDULE_TOPIC_XXXX**,佇列Id為延遲級別減1。 2. 訊息進入**SCHEDULE_TOPIC_XXXX**的佇列中。 3. 定時任務根據上次拉取的偏移量不斷從佇列中取出所有訊息。 4. 根據訊息的物理偏移量和大小再次獲取訊息。 5. 根據訊息屬性重新建立訊息,清除延遲級別,恢復原主題和佇列Id。 6. 重新發送訊息到原主題的佇列中,供消費者進行消費。

微信公眾號:萬貓學社

微信掃描二維碼

關注後回覆「 電子書」

獲取12本Java必讀技術書籍