1. 程式人生 > >消息隊列 RabbitMQ

消息隊列 RabbitMQ

source pac manage 控制 標識 ability 都是 用戶 自己的

前言

市面上的消息隊列產品有很多,比如老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,還有 ZeroMQ ,阿裏巴巴捐贈給 Apache 的 RocketMQ ,連 redis 這樣的 NoSQL 數據庫也支持 MQ 功能。總之這塊知名的產品就有十幾種。

什麽是rabbitMQ

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。一款基於AMQP協議的消息中間件,它能夠在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。而且使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。而且兩端可以使用不同的語言編寫,大大提供了靈活性。

AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,為面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。

RabbitMQ是由RabbitMQ Technologies Ltd開發並且提供商業支持的。該公司在2010年4月被SpringSource(VMWare的一個部門)收購。在2013年5月被並入Pivotal。其實VMWare,Pivotal和EMC本質上是一家的。不同的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,現在並沒有上市。

RabbitMQ的官網是http://www.rabbitmq.com


RabbitMQ 最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括:

  1. 可靠性(Reliability)
    RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。

  2. 靈活的路由(Flexible Routing)
    在消息進入隊列之前,通過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。

  3. 消息集群(Clustering)
    多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。

  4. 高可用(Highly Available Queues)
    隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。

  5. 多種協議(Multi-protocol)
    RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。

  6. 多語言客戶端(Many Clients)
    RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI)
    RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。

  8. 跟蹤機制(Tracing)
    如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什麽。

  9. 插件機制(Plugin System)
    RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。

系統架構

技術分享

  • 左側 P 代表 生產者,也就是往 RabbitMQ 發消息的程序。
  • 中間即是 RabbitMQ,其中包括了 交換機 和 隊列。
  • 右側 C 代表 消費者,也就是往 RabbitMQ 拿消息的程序。

其中比較重要的概念,分別為:虛擬主機,交換機,和綁定。

  • 虛擬主機vhost:一個虛擬主機持有一組交換機、隊列和綁定。為什麽需要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。 因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ服務器都有一個默認的虛擬主機“/”。
  • 交換機Exchange:Exchange 用於轉發消息,但是它不會做存儲 ,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。
    • 這裏有一個比較重要的概念:路由鍵 。消息到交換機的時候,交互機會轉發到對應的隊列中,那麽究竟轉發到哪個隊列,就要根據該路由鍵。
  • 綁定Binding:也就是交換機需要和隊列相綁定,這其中如上圖所示,是多對多的關系。

還有幾個概念是上述圖中沒有標明的,那就是Connection(連接),Channel(通道,頻道)。

Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以後我們可以看到,程序的起始處就是建立這個TCP連接。

Channels: 虛擬連接。它建立在上述的TCP連接中。數據流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。


Broker: 簡單來說就是消息隊列服務器實體。

Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列。

Routing Key: 路由關鍵字,exchange根據這個關鍵字進行消息投遞。

producer: 消息生產者,就是投遞消息的程序。
consumer: 消息消費者,就是接受消息的程序。

由Exchange,Queue,RoutingKey三個才能決定一個從Exchange到Queue的唯一的線路。

Exchange 類型

Exchange分發消息時根據類型的不同分發策略有區別,主要三種類型:direct、fanout、topic

direct

direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中。

技術分享

以上圖的配置為例,我們以routingKey=”error”發送消息到Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);如果我們以routingKey=”info”或routingKey=”warning”來發送消息,則消息只會路由到Queue2。如果我們以其他routingKey發送消息,則消息不會路由到這兩個Queue中。


fanout

fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。

技術分享

上圖中,生產者(P)發送到Exchange(X)的所有消息都會路由到圖中的兩個Queue,並最終被兩個消費者(C1與C2)消費。

topic

前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這裏的匹配規則有些不同,它約定:

routing key為一個句點號“.”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key與routing key一樣也是句點號“. ”分隔的字符串
binding key中可以存在兩種特殊字符“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)

技術分享

以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。


RPC

MQ本身是基於異步的消息處理,前面的示例中所有的生產者(P)將消息發送到RabbitMQ後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。

但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的消息處理完成後再進行下一步處理。這相當於RPC(Remote Procedure Call,遠程過程調用)。

在RabbitMQ中也支持RPC。

技術分享

RabbitMQ中實現RPC的機制是:

  • 客戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協議中定義了14中properties,這些屬性會隨著消息一起發送)中設置兩個值replyTo(一個Queue名稱,用於告訴服務器處理完成後將通知我的消息發送到這個Queue中)和correlationId(此次請求的標識號,服務器處理完成後需要將此屬性返還,客戶端將根據這個id了解哪條請求被成功執行了或執行失敗)
  • 服務器端收到消息並處理
  • 服務器端處理完消息後,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性
  • 客戶端之前已訂閱replyTo指定的Queue,從中收到服務器的應答消息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理

Callback queue 回調隊列

一個客戶端向服務器發送請求,服務器端處理請求後,將其處理結果保存在一個存儲體中。而客戶端為了獲得處理結果,那麽客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to

Correlation id 關聯標識

一個客戶端可能會發送多個請求給服務器,當服務器處理完後,客戶端無法辨別在回調隊列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在發送每個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就可以分辨此響應屬於哪個請求。

消息隊列 RabbitMQ