1. 程式人生 > >訊息佇列之 RocketMQ

訊息佇列之 RocketMQ

簡介

RocketMQ 特點

RocketMQ 是阿里巴巴在2012年開源的分散式訊息中介軟體,目前已經捐贈給 Apache 軟體基金會,並於2017年9月25日成為 Apache 的頂級專案。作為經歷過多次阿里巴巴雙十一這種“超級工程”的洗禮並有穩定出色表現的國產中介軟體,以其高效能、低延時和高可靠等特性近年來已經也被越來越多的國內企業使用。其主要特點有:

  1. 靈活可擴充套件性
    RocketMQ 天然支援叢集,其核心四元件(Name Server、Broker、Producer、Consumer)每一個都可以在沒有單點故障的情況下進行水平擴充套件。

  2. 海量訊息堆積能力
    RocketMQ 採用零拷貝原理實現超大的訊息的堆積能力,據說單機已可以支援億級訊息堆積,而且在堆積了這麼多訊息後依然保持寫入低延遲。

  3. 支援順序訊息
    可以保證訊息消費者按照訊息傳送的順序對訊息進行消費。順序訊息分為全域性有序和區域性有序,一般推薦使用區域性有序,即生產者通過將某一類訊息按順序傳送至同一個佇列來實現。

  4. 多種訊息過濾方式
    訊息過濾分為在伺服器端過濾和在消費端過濾。伺服器端過濾時可以按照訊息消費者的要求做過濾,優點是減少不必要訊息傳輸,缺點是增加了訊息伺服器的負擔,實現相對複雜。消費端過濾則完全由具體應用自定義實現,這種方式更加靈活,缺點是很多無用的訊息會傳輸給訊息消費者。

  5. 支援事務訊息
    RocketMQ 除了支援普通訊息,順序訊息之外還支援事務訊息,這個特性對於分散式事務來說提供了又一種解決思路。

  6. 回溯消費
    回溯消費是指消費者已經消費成功的訊息,由於業務上需求需要重新消費,RocketMQ 支援按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向後回溯。

基本概念

下面是一張 RocketMQ 的部署結構圖,裡面涉及了 RocketMQ 核心的四大元件:Name Server、Broker、Producer、Consumer ,每個元件都可以部署成叢集模式進行水平擴充套件。

img

部署結構圖

生產者

生產者(Producer)負責產生訊息,生產者向訊息伺服器傳送由業務應用程式系統生成的訊息。 RocketMQ 提供了三種方式傳送訊息:同步、非同步和單向。

同步傳送

同步傳送指訊息傳送方發出資料後會在收到接收方發回響應之後才發下一個數據包。一般用於重要通知訊息,例如重要通知郵件、營銷簡訊。

非同步傳送

非同步傳送指傳送方發出資料後,不等接收方發回響應,接著傳送下個數據包,一般用於可能鏈路耗時較長而對響應時間敏感的業務場景,例如使用者視訊上傳後通知啟動轉碼服務。

單向傳送

單向傳送是指只負責傳送訊息而不等待伺服器迴應且沒有回撥函式觸發,適用於某些耗時非常短但對可靠性要求並不高的場景,例如日誌收集。

生產者組

生產者組(Producer Group)是一類 Producer 的集合,這類 Producer 通常傳送一類訊息並且傳送邏輯一致,所以將這些 Producer 分組在一起。從部署結構上看生產者通過 Producer Group 的名字來標記自己是一個叢集。

消費者

消費者(Consumer)負責消費訊息,消費者從訊息伺服器拉取資訊並將其輸入使用者應用程式。站在使用者應用的角度消費者有兩種型別:拉取型消費者、推送型消費者。

拉取型消費者

拉取型消費者(Pull Consumer)主動從訊息伺服器拉取資訊,只要批量拉取到訊息,使用者應用就會啟動消費過程,所以 Pull 稱為主動消費型。

推送型消費者

推送型消費者(Push Consumer)封裝了訊息的拉取、消費進度和其他的內部維護工作,將訊息到達時執行的回撥介面留給使用者應用程式來實現。所以 Push 稱為被動消費型別,但從實現上看還是從訊息伺服器中拉取訊息,不同於 Pull 的是 Push 首先要註冊消費監聽器,當監聽器處觸發後才開始消費訊息。

消費者組

消費者組(Consumer Group)一類 Consumer 的集合名稱,這類 Consumer 通常消費同一類訊息並且消費邏輯一致,所以將這些 Consumer 分組在一起。消費者組與生產者組類似,都是將相同角色的分組在一起並命名,分組是個很精妙的概念設計,RocketMQ 正是通過這種分組機制,實現了天然的訊息負載均衡。消費訊息時通過 Consumer Group 實現了將訊息分發到多個消費者伺服器例項,比如某個 Topic 有9條訊息,其中一個 Consumer Group 有3個例項(3個程序或3臺機器),那麼每個例項將均攤3條訊息,這也意味著我們可以很方便的通過加機器來實現水平擴充套件。

訊息伺服器

訊息伺服器(Broker)是訊息儲存中心,主要作用是接收來自 Producer 的訊息並存儲, Consumer 從這裡取得訊息。它還儲存與訊息相關的元資料,包括使用者組、消費進度偏移量、佇列資訊等。從部署結構圖中可以看出 Broker 有 Master 和 Slave 兩種型別,Master 既可以寫又可以讀,Slave 不可以寫只可以讀。從物理結構上看 Broker 的叢集部署方式有四種:單 Master 、多 Master 、多 Master 多 Slave(同步刷盤)、多 Master多 Slave(非同步刷盤)。

單 Master

這種方式一旦 Broker 重啟或宕機會導致整個服務不可用,這種方式風險較大,所以顯然不建議線上環境使用。

多 Master

所有訊息伺服器都是 Master ,沒有 Slave 。這種方式優點是配置簡單,單個 Master 宕機或重啟維護對應用無影響。缺點是單臺機器宕機期間,該機器上未被消費的訊息在機器恢復之前不可訂閱,訊息實時性會受影響。

多 Master 多 Slave(非同步複製)

每個 Master 配置一個 Slave,所以有多對 Master-Slave,訊息採用非同步複製方式,主備之間有毫秒級訊息延遲。這種方式優點是訊息丟失的非常少,且訊息實時性不會受影響,Master 宕機後消費者可以繼續從 Slave 消費,中間的過程對使用者應用程式透明,不需要人工干預,效能同多 Master 方式幾乎一樣。缺點是 Master 宕機時在磁碟損壞情況下會丟失極少量訊息。

多 Master 多 Slave(同步雙寫)

每個 Master 配置一個 Slave,所以有多對 Master-Slave ,訊息採用同步雙寫方式,主備都寫成功才返回成功。這種方式優點是資料與服務都沒有單點問題,Master 宕機時訊息無延遲,服務與資料的可用性非常高。缺點是效能相對非同步複製方式略低,傳送訊息的延遲會略高。

名稱伺服器

名稱伺服器(NameServer)用來儲存 Broker 相關元資訊並給 Producer 和 Consumer 查詢 Broker 資訊。NameServer 被設計成幾乎無狀態的,可以橫向擴充套件,節點之間相互之間無通訊,通過部署多臺機器來標記自己是一個偽叢集。每個 Broker 在啟動的時候會到 NameServer 註冊,Producer 在傳送訊息前會根據 Topic 到 NameServer 獲取到 Broker 的路由資訊,Consumer 也會定時獲取 Topic 的路由資訊。所以從功能上看應該是和 ZooKeeper 差不多,據說 RocketMQ 的早期版本確實是使用的 ZooKeeper ,後來改為了自己實現的 NameServer 。

訊息

訊息(Message)就是要傳輸的資訊。一條訊息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。一條訊息也可以擁有一個可選的標籤(Tag)和額處的鍵值對,它們可以用於設定一個業務 key 並在 Broker 上查詢此訊息以便在開發期間查詢問題。

主題

主題(Topic)可以看做訊息的規類,它是訊息的第一級型別。比如一個電商系統可以分為:交易訊息、物流訊息等,一條訊息必須有一個 Topic 。Topic 與生產者和消費者的關係非常鬆散,一個 Topic 可以有0個、1個、多個生產者向其傳送訊息,一個生產者也可以同時向不同的 Topic 傳送訊息。一個 Topic 也可以被 0個、1個、多個消費者訂閱。

標籤

標籤(Tag)可以看作子主題,它是訊息的第二級型別,用於為使用者提供額外的靈活性。使用標籤,同一業務模組不同目的的訊息就可以用相同 Topic 而不同的 Tag 來標識。比如交易訊息又可以分為:交易建立訊息、交易完成訊息等,一條訊息可以沒有 Tag 。標籤有助於保持您的程式碼乾淨和連貫,並且還可以為 RocketMQ 提供的查詢系統提供幫助。

訊息佇列

訊息佇列(Message Queue),主題被劃分為一個或多個子主題,即訊息佇列。一個 Topic 下可以設定多個訊息佇列,傳送訊息時執行該訊息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有佇列將訊息發出去。下圖 Broker 內部訊息情況:

img

Broker 內部訊息

訊息消費模式

訊息消費模式有兩種:叢集消費(Clustering)和廣播消費(Broadcasting)。預設情況下就是叢集消費,該模式下一個消費者叢集共同消費一個主題的多個佇列,一個佇列只會被一個消費者消費,如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。而廣播消費訊息會發給消費者組中的每一個消費者進行消費。

訊息順序

訊息順序(Message Order)有兩種:順序消費(Orderly)和並行消費(Concurrently)。順序消費表示訊息消費的順序同生產者為每個訊息佇列傳送的順序一致,所以如果正在處理全域性順序是強制性的場景,需要確保使用的主題只有一個訊息佇列。並行消費不再保證訊息順序,消費的最大並行數量受每個消費者客戶端指定的執行緒池限制。

工程例項

Java 訪問 RocketMQ 例項

RocketMQ 目前支援 Java、C++、Go 三種語言訪問,按慣例以 Java 語言為例看下如何用 RocketMQ 來收發訊息的。

引入依賴

  <dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version>4.2.0</version>
 </dependency>

新增 RocketMQ 客戶端訪問支援,具體版本和安裝的 RocketMQ 版本一致即可。

訊息生產者

package org.study.mq.rocketMQ.java;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
   public static void main(String[] args) throws Exception {
       //建立一個訊息生產者,並設定一個訊息生產者組
       DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");
       //指定 NameServer 地址
       producer.setNamesrvAddr("localhost:9876");
       //初始化 Producer,整個應用生命週期內只需要初始化一次
       producer.start();
       for (int i = 0; i < 100; i++) {
           //建立一條訊息物件,指定其主題、標籤和訊息內容
           Message msg = new Message(
                   "topic_example_java" /* 訊息主題名 */,
                   "TagA" /* 訊息標籤 */,
                   ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 訊息內容 */
           );
           //傳送訊息並返回結果
           SendResult sendResult = producer.send(msg);
           System.out.printf("%s%n", sendResult);
       }
       // 一旦生產者例項不再被使用則將其關閉,包括清理資源,關閉網路連線等
       producer.shutdown();
   }
}

示例中用 DefaultMQProducer 類來建立一個訊息生產者,通常一個應用建立一個 DefaultMQProducer 物件,所以一般由應用來維護生產者物件,可以其設定為全域性物件或者單例。該類建構函式入參 producerGroup 是訊息生產者組的名字,無論生產者還是消費者都必須給出 GroupName ,並保證該名字的唯一性,ProducerGroup 傳送普通的訊息時作用不大,後面介紹分散式事務訊息時會用到。

接下來指定 NameServer 地址和呼叫 start 方法初始化,在整個應用生命週期內只需要呼叫一次 start 方法。

初始化完成後,呼叫 send 方法傳送訊息,示例中只是簡單的構造了100條同樣的訊息傳送,其實一個 Producer 物件可以傳送多個主題多個標籤的訊息,訊息物件的標籤可以為空。send 方法是同步呼叫,只要不拋異常就標識成功。

最後應用退出時呼叫 shutdown 方法清理資源、關閉網路連線,從伺服器上登出自己,通常建議應用在 JBOSS、Tomcat 等容器的退出鉤子裡呼叫 shutdown 方法。

訊息消費者

package org.study.mq.rocketMQ.java;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
public class Consumer {
   public static void main(String[] args) throws Exception {
       //建立一個訊息消費者,並設定一個訊息消費者組
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
       //指定 NameServer 地址
       consumer.setNamesrvAddr("localhost:9876");
       //設定 Consumer 第一次啟動時從佇列頭部開始消費還是佇列尾部開始消費
       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
       //訂閱指定 Topic 下的所有訊息
       consumer.subscribe("topic_example_java", "*");
       //註冊訊息監聽器
       consumer.registerMessageListener(new MessageListenerConcurrently() {
           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
               //預設 list 裡只有一條訊息,可以通過設定引數來批量接收訊息
               if (list != null) {
                   for (MessageExt ext : list) {
                       try {
                           System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
                       } catch (UnsupportedEncodingException e) {
                           e.printStackTrace();
                       }
                   }
               }
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
       });
       // 消費者物件在使用之前必須要呼叫 start 初始化
       consumer.start();
       System.out.println("訊息消費者已啟動");
   }
}

示例中用 DefaultMQPushConsumer 類來建立一個訊息消費者,通生產者一樣一個應用一般建立一個 DefaultMQPushConsumer 物件,該物件一般由應用來維護,可以其設定為全域性物件或者單例。該類建構函式入參 consumerGroup 是訊息消費者組的名字,需要保證該名字的唯一性。

接下來指定 NameServer 地址和設定消費者應用程式第一次啟動時從佇列頭部開始消費還是佇列尾部開始消費。

接著呼叫 subscribe 方法給消費者物件訂閱指定主題下的訊息,該方法第一個引數是主題名,第二個擦書是標籤名,示例表示訂閱了主題名 topic_example_java 下所有標籤的訊息。

最主要的是註冊訊息監聽器才能消費訊息,示例中用的是 Consumer Push 的方式,即設定監聽器回撥的方式消費訊息,預設監聽回撥方法中 List 裡只有一條訊息,可以通過設定引數來批量接收訊息。

最後呼叫 start 方法初始化,在整個應用生命週期內只需要呼叫一次 start 方法。

啟動 Name Server

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

RocketMQ 核心的四大元件中 Name Server 和 Broker 都是由 RocketMQ 安裝包提供的,所以要啟動這兩個應用才能提供訊息服務。首先啟動 Name Server,先確保你的機器中已經安裝了與 RocketMQ 相匹配的 JDK ,並設定了環境變數 JAVA_HOME ,然後在 RocketMQ 的安裝目錄下執行 bin 目錄下的 mqnamesrv ,預設會將該命令的執行情況輸出到當前目錄的 nohup.out 檔案,最後跟蹤日誌檔案檢視 Name Server 的實際執行情況。

啟動 Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

同樣也要確保你的機器中已經安裝了與 RocketMQ 相匹配的 JDK ,並設定了環境變數 JAVA_HOME ,然後在 RocketMQ 的安裝目錄下執行 bin 目錄下的 mqbroker ,預設會將該命令的執行情況輸出到當前目錄的 nohup.out 檔案,最後跟蹤日誌檔案檢視 Broker 的實際執行情況。

執行 Consumer

先執行 Consumer 類,這樣當生產者傳送訊息的時候能在消費者後端看到訊息記錄。配置沒問題的話會看到在控制檯打印出訊息消費者已啟動

執行 Producer

最後執行 Producer 類,在 Consumer 的控制檯能看到接收的訊息

img

消費者接收到訊息

Spring 整合 RocketMQ

不同於 RabbitMQ、ActiveMQ、Kafka 等訊息中介軟體,Spring 社群已經通過多種方式提供了對這些中介軟體產品整合,例如通過 spring-jms 整合 ActiveMQ、通過 Spring AMQP 專案下的 spring-rabbit 整合 RabbitMQ、通過 spring-kafka 整合