1. 程式人生 > >RabbitMQ實戰:理解消息通信

RabbitMQ實戰:理解消息通信

RabbitMQ

本系列是「RabbitMQ實戰:高效部署分布式消息隊列」書籍的總結筆記。

前段時間總結完了「深入淺出MyBatis」系列,對MyBatis有了更全面和深入的了解,在掘金社區也收到了一些博友的喜歡,很高興。另外,短暫的陪產假就要結束了,小寶也二周了,下周二就要投入工作了,希望自己盡快調整過來,加油努力。

從本篇開始總結「RabbitMQ實戰」系列的閱讀筆記,RabbitMQ是一個開源的消息代理和隊列服務器,可以通過基本協議在完全不同的應用之間共享數據,可以將作業排隊以便讓分布式服務進行處理。

本篇介紹下消息通信,首先介紹基礎概念,將這些概念映射到AMQP協議,然後介紹消息持久化、發送方確認模式等消息可靠性保證。

通過本篇介紹,你會了解到:

  • 消息通信概念:消費者、生產者和代理
  • AMQP元素:隊列、交換器、綁定
  • 虛擬主機
  • 消息持久化
  • 發送方確認模式

消息通信概念

此部分的介紹,會牽涉到AMQP的元素,如果之前沒接觸過的,可以結合下面的「AMQP元素」進行理解。

消息

消息是傳輸的主體,消息包括兩部分:有效載荷(payload)和標簽(label);有效載荷是要傳輸的數據,可以是任何內容,比如JSON串、二進制、自定義的數據協議等;標簽描述了有效載荷,並且Rabbit用它來決定誰將獲得消息的投遞。

可以與HTTP協議類比,HTTP消息頭部描述了消息體的類型、大小等,HTTP消息體是要傳輸的數據,HTTP服務端通過消息頭部決定如何處理請求和數據。

生產者和消費者

生產者創建消息,然後發送到代理服務器(RabbitMQ Server),AMQP只會用標簽表述這條消息(一個交換器名稱和可選的主題標記),Rabbit服務器會根據標簽把消息發送給訂閱的消費者。

消費者消費消息,它會訂閱到隊列(queue)上,每當有消息到達RabbitMQ服務器時,會發送給消費者,消費者收到消息時,會進行處理。

註意:消費者收到的消息只包括有效載荷,所有不會知道是從哪裏發來的。

連接和信道

要想發布或消費消息,必須先與RabbitMQ Server建立一條TCP連接,建立TCP連接之後,要創建一條信道,信道是建立在真實TCP連接的虛擬連接。

AMQP命令都是通過信道發送出去的,每條信道會被指派一個唯一的ID,為什麽不直接通過TCP連接發送AMQP命令呢? 因為操作系統建立和銷毀TCP會話是很昂貴的,而且創建的連接數也有限。 通過引入通道,可以在連接上建立通道,而且通道是私密的,相互不受影響。

通道的概念還是有點抽象,後面專門寫一篇文章進行分析介紹,這裏簡單理解下吧。

AMQP元素

AMQP消息路由有三部分組成:隊列、交換器和綁定,隊列是存放消息的地方,交換器是決定不同的分發策略,綁定是隊列和交換器的橋梁,定義匹配規則。

生產者發送消息到交換器,交換器根據自身類型和綁定規則,將消息存放在對應隊列中,然後將消息發送到監聽隊列的消費者。

技術分享圖片

如上圖:P為生產者,X為交換器,交換器類型為direct,根據不同的綁定規則(orange、black、green),分發給不同的隊列,C為消費者,從不同的隊列介紹消息。

隊列

消費者通過兩種方式從特定的隊列接收消息:

  • basic.consume,這樣會將信道置為接收模式,直到取消對隊列的訂閱;
  • basic.get,主動讓消費者接收隊列中的下一條消息;

basic.get會影響性能,推薦使用basic.consume來實現高吞吐量,因為其處理過程是先訂閱消息,獲取單條消息,再取取消訂閱。

如果隊列擁有多個消費者時,隊列收到的消息將以循環的方式發給消費者,即多個消費者平均消費這些消息。

另外,消費者接收到的每一條消息都要進行確認,必須通過basic.ack命令向rabbitmq服務端發送一個確認。 也可以設置auto_ack為true,只要消費者接收到消息,就自動視為確認,不過不建議這樣,因為接收到不代表業務邏輯處理成功。 服務端接收到確認後,會從隊列中刪除對應消息。

還有一種場景,在接收到消息後,如果不想處理,可以通過下面方式處理:

  • 把消費者從RabbitMQ服務器斷開連接,,這樣RabbitMQ會自動將消息入隊並發送給另外一個消費者;
  • 如果不想發送給其他消費者處理,就是想忽略這個消息,可以發送basic.reject命令;

最後來介紹下如何創建隊列,首先明確下是生成者還是消費者創建,關鍵點是:生產者能否承擔起丟失消息,因為發出去的消息如果路由到了不存在的隊列,Rabbit會忽略它們。所以,建議生成者和消費者都嘗試去創建隊列,可以通過設置queue.declare的passive選項設置為ture來判斷隊列是否存在,如果不存在會返回一個錯誤。

通過queue.declare命令來創建隊列,有一些選項說明下:

  • exclusive:如果設置true的化,隊列將變成私有的,只有創建隊列的應用程序才能夠消費隊列消息;
  • auto-delete:當最後一個消費者取消訂閱的時候,隊列會自動移除;
  • durable:是否要持久化;
queueDeclare(String queue, 
            boolean durable, 
            boolean exclusive, 
            Map<String, Object> arguments);
交換器和綁定

交換器有四種類型:direct、fanout、topic、headers,其中headers匹配消息的header而非路由鍵,不太實用,就不詳細介紹了。

第一種:direct交換器

direct交換器比較簡單,如果和路由鍵 完全匹配 的話,就會投遞到對應的隊列:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

服務器默認包含一個空白字符串名稱的默認路由器,當聲明一個隊列時,會自定綁定到默認交換器,並以隊列名稱作為路由鍵。

第二種:fanout交換器

fanout交換器,不處理路由鍵,只需要簡單的將隊列綁定到交換機上,為會每個消費者自動生成一個隨機隊列,所有的消費者都會收到所有消息。

技術分享圖片

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

第三種:topic交換器

topic交換器,將路由鍵和某模式進行匹配,此時隊列需要綁定要一個模式上。

技術分享圖片

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

關於模式,符號#匹配一個或多個詞,符號匹配一個詞,因此kfs.#能夠匹配到kfs.session.message,但是audit.只會匹配到audit.session。

虛擬主機

每個RabbitMQ服務器都能創建虛擬消息服務器,稱為虛擬主機(vhost),每個RabbitMQ本質上是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器、綁定,還有自己的權限機制。

連接時,必須制定vhost,rabbitmq包含了默認的vhost:"/"。當創建一個用戶時,會被指派給至少一個vhsot,並且相互隔離。

vhost不能通過AMQP協議創建,需要使用rabbitmqctl工具創建。

消息持久化和發送方確認模式

如果沒有持久化,重啟rabbitmq後,隊列、交換器都會消失,RabbitMQ提供了持久化的功能,需要滿足以下三個條件:

  • 交換器設置為持久化,通過durable屬性;
  • 隊列設置為持久化,通過durable屬性;
  • 消息投遞模式delivery設置為2;

當發布一條持久化消息到持久化交換器上時,rabbit會在消息提交到日誌文件後才會發送響應,所有會損失性能,所以,只對重要數據持久化即可。

考慮這種情況:由於發布消息後,不返回任何信息給生產者,如何只對服務器已經持久化到硬盤了呢,可能在傳輸過程中丟失,或者持久化前服務器宕機,導致消息丟失。

RabbitMQ通過「發送方確認模式」來解決上面的問題。首先,需要將信道設置成confirm模式,這樣所有在信道上發布的消息都會被指派一個唯一的ID號,一旦消息被投遞到所有匹配的隊列或持久化到磁盤,會發送一個確認消息給生產者。

通過本篇的介紹,對Rabbit的消息模型有了整體了解,下一篇會寫個DEMO,並介紹下運行和管理RabbitMQ。

歡迎掃描下方二維碼,關註我的個人微信公眾號 ~

技術分享圖片

RabbitMQ實戰:理解消息通信