1. 程式人生 > >阿里雲MQ快速入門指導

阿里雲MQ快速入門指導

官方文件

本文主要描述從開通 MQ 服務、建立 MQ 資源,到使用 MQ SDK 進行訊息收發的完整流程,旨在以最簡單明瞭的方式引導您快速上手 MQ,為進一步使用和熟悉 MQ 的功能提供入門。

訊息收發部分以 TCP 協議下呼叫 Java SDK 為例來演示。

MQ 快速接入流程圖:

quickstart_process

步驟一:開通服務

請按照以下步驟開通 MQ 服務:

  1. 登入阿里雲主頁,將滑鼠依次移動到產品 > 網際網路中介軟體,單擊訊息佇列進入 MQ 產品主頁。

  2. 在 MQ 產品頁上,單擊立即開通進入 MQ 服務開通頁面,根據提示完成開通服務。

如果您已經開通 MQ 服務,請直接登入

MQ 控制檯

步驟二:建立資源

在 MQ 訊息系統中,訊息生產者將訊息傳送到某個指定的訊息主題(Topic) ,而訊息消費者則通過訂閱該指定的 Topic 來獲取和消費訊息。

一個新的應用接入 MQ 需要先建立相關的 MQ 資源,包括:

注意:當您刪除資源,比如 Topic、訊息生產者、訊息消費者的時候,相關的資源也會在 10 分鐘內進行清理。

建立訊息主題(Topic)

訊息主題(Topic)是 MQ 裡對訊息進行的一級歸類,比如可以建立“Topic_Trade”這一主題用來識別交易類訊息。 使用MQ的第一步需要先為您的應用建立 Topic。

您可以按照以下步驟建立 Topic:

  1. 登入MQ 控制檯,預設進入Topic 管理頁面。

  2. 在頁面上方選擇相應的地域(比如公網域),然後單擊建立 Topic按鈕。

    注意:

    • 如果只是測試,或者需要在本地(非阿里雲 ECS 伺服器)使用 MQ 服務,請將 Topic 建立在公網環境。 生產端和消費端可以部署在本地或者部署在任意地域的 ECS 上,前提是本地伺服器或者相應的 ECS 需要能夠訪問公網。
    • 如果在生產環境使用 MQ 服務,需要將應用程式部署在阿里雲 ECS 上,同時 Topic 也需要在應用程式所在的區域(即所部署的 ECS 區域)進行建立。
    • Topic 不能跨域使用。 比如 Topic 建立在“華北 2”這個域,那麼訊息生產端和消費端也必須執行在“華北 2”的 ECS 上。
    • 有關域的詳細介紹請參見 ECS 文件中的地域和可用區
  3. 建立 Topic對話方塊輸入 Topic 名稱及備註,單擊確定。 您建立的 Topic 將出現在 Topic 列表中。

    注意:Topic 名稱必須全域性唯一。 如果名稱已經被其他使用者使用,您將無法建立相同名稱的 Topic。

訊息型別

訊息型別分為以下幾種:

  • Kafka 訊息:相容 MQ 及 Kafka 協議(kafka-client-0.10 及以上版本)的訊息型別。
  • 無序訊息:不保證先入先出(FIFO)的嚴格順序,包含普通訊息、定時/延時訊息、事務訊息。 建議建立不同的 Topic 來發送不同型別的訊息,例如用 Topic A 傳送普通訊息,Topic B 傳送事務訊息, Topic C 傳送延時/定時訊息。
  • 全域性順序訊息:所有訊息嚴格按照 FIFO 的嚴格順序進行生產和消費。
  • 分割槽順序訊息:訊息根據 sharding key 進行分割槽,提高整體併發度與使用效能。 同一個分割槽的訊息嚴格按照 FIFO 的嚴格順序進行生產和消費。

建立生產者(Producer ID)

建立好 Topic 後,要為這個 Topic 建立訊息生產端的資源,即建立 Producer ID。 一個 Topic 只能對應一個 Producer ID。

請按照以下步驟為您的 Topic 建立 Producer ID:

  1. 在 MQ 控制檯左側選單欄選擇Topic 管理

  2. 在 Topic 列表中找到您剛剛建立的 Topic,單擊操作選項中的建立生產者

  3. 建立生產者對話方塊輸入 Producer ID,單擊確定

    注意:

    • Producer ID 必須全域性唯一。 如果名稱已存在,您將無法建立相同名稱的 Producer ID。
    • Topic 對應的生產端必須和這個 Topic 在同一個域,比如您在“公網”域建立了“Topic_open”,那麼和“Topic_open”對應的 Producer ID 也必須在同一個域。
    • Producer ID 和 Topic 的關係是 1:N,即一個 Topic 只能繫結一個 Producer ID, 但是同一個 Producer ID 可以對應多個 Topic。

建立消費者(Consumer ID)

建立完訊息生產端後,您需要為 Topic 申請相應的訊息消費資源,即建立 Consumer ID。

請按以下步驟建立 Consumer ID:

  1. 在 MQ 控制檯左側選單欄選擇 Topic 管理

  2. 找到您建立的 Topic,單擊右側操作選項裡的建立消費者

  3. 建立消費者對話方塊輸入 Consumer ID,單擊確定

    注意:

    • Consumer ID 必須全域性唯一。 如果名稱已存在,您將無法建立相同名稱的 Consumer ID。
    • Consumer ID 必須和對應的 Topic 在同一個域,比如“公網”域的“Topic_open”可繫結同在“公網”域的 Consumer ID “CID_123”,而“華北1”域內的“Topic_huabei1”則不能繫結該Consumer ID。
    • Consumer ID 和 Topic 的關係是 N:N。 同一個 Consumer ID 可以訂閱多個 Topic,同一個 Topic 也可以對應多個 Consumer ID。

建立阿里雲 AccessKey 和 SecretKey

在呼叫 SDK/API 進行訊息傳送和訂閱的時候,除了需要指定建立的 Topic, Producer ID 以及 Consumer ID 以外,還需輸入您在 RAM 控制檯建立的身份驗證資訊,即 Access Key ID 和 Acess Key Secret。

關於如何建立 AccessKey 和 SecretKey, 請參閱建立AccessKey

步驟三:獲取接入域名

在控制檯建立 Topic 資源後,您還需要通過控制檯獲取 Producer ID 的 TCP 接入域名和 Consumer ID 的 TCP 接入域名。

請按照以下步驟獲取 Producer ID 的接入域名或 Consumer ID 的接入域名:

  1. 在 MQ 控制檯左側選單欄,選擇生產者管理消費者管理

  2. 找到您建立的 Producer ID 或 Consumer ID,單擊右側操作列中的獲取接入點

  3. 獲取接入點提示框,單擊複製

完成以上準備工作後,您就可以執行示例程式碼,用 MQ 進行訊息傳送和訂閱了。

步驟四:傳送訊息

您可以通過控制檯傳送訊息或者呼叫 SDK/API 傳送訊息。

  • 控制檯傳送訊息:用於快速驗證 Topic 資源的可用性。

  • 呼叫 SDK/API 傳送訊息:用於生產環境下使用 MQ。

通過控制檯傳送訊息

控制檯傳送訊息步驟如下:

  1. 在 MQ 控制檯的左側選單欄單擊生產者管理

  2. 在列表中找到您剛剛建立的 Topic,單擊右側操作欄裡的傳送

  3. 傳送訊息對話方塊輸入訊息的具體內容,單擊確定。控制檯會返回訊息傳送成功通知以及相應的 Message ID。

呼叫 SDK/API 傳送訊息

在生產環境使用 MQ,建議呼叫 SDK/API 來進行訊息傳送。本文以 TCP 協議下呼叫 Java SDK 為例進行說明。如果需要使用其他協議或者開發語言,請參見相關幫助文件。

呼叫 TCP Java SDK 傳送訊息

  1. 通過下面兩種方式可以引入依賴(任選一種):

    • Maven 方式引入依賴:

      1. <dependency>
      2. <groupId>com.aliyun.openservices</groupId>
      3. <artifactId>ons-client</artifactId>
      4. <version>"XXX"</version>
      5. //設定為 Java SDK 的最新版本號
      6. </dependency>

      關於 Java SDK 的最新版本號,請檢視版本說明

    • 下載依賴 JAR 包:

      關於 Java SDK 最新版本的下載連結,請檢視版本說明

  2. 根據以下說明設定相關引數,執行示例程式碼:

    說明:關於 TCP 接入點域名,請進入 MQ 控制檯的生產者管理頁面,在 PID 右側操作列單擊獲取接入點按鈕獲取。

    1. import com.aliyun.openservices.ons.api.Message;
    2. import com.aliyun.openservices.ons.api.Producer;
    3. import com.aliyun.openservices.ons.api.SendResult;
    4. import com.aliyun.openservices.ons.api.ONSFactory;
    5. import com.aliyun.openservices.ons.api.PropertyKeyConst;
    6. import java.util.Properties;
    7. public class ProducerTest {
    8. public static void main(String[] args) {
    9. Properties properties = new Properties();
    10. // 您在 MQ 控制檯建立的 Producer ID
    11. properties.put(PropertyKeyConst.ProducerId, "XXX");
    12. // 鑑權用 AccessKey,在阿里雲伺服器管理控制檯建立
    13. properties.put(PropertyKeyConst.AccessKey,"XXX");
    14. // 鑑權用 SecretKey,在阿里雲伺服器管理控制檯建立
    15. properties.put(PropertyKeyConst.SecretKey, "XXX");
    16. // 設定 TCP 接入域名,進入 MQ 控制檯的生產者管理頁面,在右側操作欄單擊獲取接入點獲取
    17. // 此處以公有云公網地域接入點為例
    18. properties.put(PropertyKeyConst.ONSAddr,
    19. "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
    20. Producer producer = ONSFactory.createProducer(properties);
    21. // 在傳送訊息前,必須呼叫 start 方法來啟動 Producer,只需呼叫一次即可
    22. producer.start();
    23. //迴圈傳送訊息
    24. while(true){
    25. Message msg = new Message( //
    26. // 在控制檯建立的 Topic,即該訊息所屬的 Topic 名稱
    27. "TopicTestMQ",
    28. // Message Tag,
    29. // 可理解為 Gmail 中的標籤,對訊息進行再歸類,方便 Consumer 指定過濾條件在 MQ 伺服器過濾
    30. "TagA",
    31. // Message Body
    32. // 任何二進位制形式的資料, MQ 不做任何干預,
    33. // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
    34. "Hello MQ".getBytes());
    35. // 設定代表訊息的業務關鍵屬性,請儘可能全域性唯一,以方便您在無法正常收到訊息情況下,可通過 MQ 控制檯查詢訊息並補發
    36. // 注意:不設定也不會影響訊息正常收發
    37. msg.setKey("ORDERID_100");
    38. // 傳送訊息,只要不拋異常就是成功
    39. // 列印 Message ID,以便用於訊息傳送狀態查詢
    40. SendResult sendResult = producer.send(msg);
    41. System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
    42. }
    43. // 在應用退出前,可以銷燬 Producer 物件
    44. // 注意:如果不銷燬也沒有問題
    45. producer.shutdown();
    46. }
    47. }

檢視訊息是否傳送成功

訊息傳送後,您可以在控制檯檢視訊息傳送狀態,步驟如下:

  1. 在 MQ 控制檯左側選單欄中單擊訊息查詢

  2. 訊息查詢頁面,選擇按 Message ID 查詢標籤頁。

  3. 在搜尋框中輸入傳送訊息後返回的 Message ID,單擊搜尋查詢訊息傳送狀態。

    “儲存時間”表示 MQ 服務端儲存這條訊息的時間。如果查詢到此訊息,表示訊息已經成功傳送到服務端。

注意:此步驟演示的是第一次使用 MQ 的場景,此時訂閱端從未啟動過,所以訊息狀態顯示暫無消費資料。要啟動訂閱端並進行訊息訂閱請繼續下一步操作訂閱訊息。更多訊息狀態請參見訊息查詢

步驟五:訂閱訊息

訊息傳送成功後,需要啟動訂閱方進行訊息訂閱。本文以 TCP Java SDK 為例,介紹如何通過呼叫相關協議及開發語言的 SDK/API 來完成訊息訂閱。

呼叫 TCP Java SDK 訂閱訊息

您可以執行以下示例程式碼來啟動訂閱端,並測試訂閱訊息的功能。請按照說明正確設定相關引數。目前控制檯提供了 Java,C++, .NET的示例程式碼。

說明:關於 TCP 接入點域名,請進入 MQ 控制檯的消費者管理頁面,在 CID 右側操作列單擊獲取接入點按鈕獲取。

  1. import com.aliyun.openservices.ons.api.Action;
  2. import com.aliyun.openservices.ons.api.ConsumeContext;
  3. import com.aliyun.openservices.ons.api.Consumer;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public class ConsumerTest {
  10. public static void main(String[] args) {
  11. Properties properties = new Properties();
  12. // 您在 MQ 控制檯建立的 Consumer ID
  13. properties.put(PropertyKeyConst.ConsumerId, "XXX");
  14. // 鑑權用 AccessKey,在阿里雲伺服器管理控制檯建立
  15. properties.put(PropertyKeyConst.AccessKey, "XXX");
  16. // 鑑權用 SecretKey,在阿里雲伺服器管理控制檯建立
  17. properties.put(PropertyKeyConst.SecretKey, "XXX");
  18. // 設定 TCP 接入域名,進入 MQ 控制檯的消費者管理頁面,在右側操作欄單擊獲取接入點獲取
  19. // 此處以公有云公網地域接入點為例
  20. properties.put(PropertyKeyConst.ONSAddr,
  21. "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
  22. Consumer consumer = ONSFactory.createConsumer(properties);
  23. consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
  24. public Action consume(Message message, ConsumeContext context) {
  25. System.out.println("Receive: " + message);
  26. return Action.CommitMessage;
  27. }
  28. });
  29. consumer.start();
  30. System.out.println("Consumer Started");
  31. }
  32. }

檢視訊息訂閱是否成功

完成上述步驟後,您可以在 MQ 控制檯檢視訂閱端是否啟動成功,即訊息訂閱是否成功。目前控制檯檢視消費者狀態僅支援 TCP 客戶端,暫不支援 HTTP 以及 MQTT 客戶端。

  1. 在 MQ 控制檯左側選單欄單擊消費者管理

  2. 找到要檢視的 Topic,單擊右側操作選項裡的消費者狀態。 如果是否線上顯示為,則說明訂閱端已成功啟動。如果消費者狀態是否線上顯示為,說明消費端沒有啟動或者啟動失敗。

完成以上所有步驟後,您就成功接入了 MQ 服務,可以用 MQ 進行訊息傳送和訂閱了。