1. 程式人生 > >RabbitMQ之入門和基本概念

RabbitMQ之入門和基本概念

目錄

簡介

基本概念

ConnectionFactory、Connection、Channel——基本物件

Queue訊息佇列——內部物件

exchange訊息分發策略(路由)

1. Direct策略

2. Fanout策略

3. Topic策略

4. Headers策略

看一下RabbitMQ是怎麼具體來實現訊息佇列的

引數設定 Message durability、Prefetch count、ACK

RPC機制


整理來源:http://blog.csdn.net/whycold/article/details/41119807

整理來源:https://blog.csdn.net/liuyueyi25/article/details/80489596

整理來源:百度百科

簡介

MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式通過 佇列來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ等等...

RabbitMQ是一個訊息佇列,和Kafka以及阿里的ActiveMQ從屬性來講,乾的都是一回事。訊息佇列的主要目的實現訊息的生產者和消費者之間的解耦,支援多應用之間的非同步協調工作。

使用場景:在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。

基本概念

ConnectionFactory、Connection、Channel——基本物件

ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的物件。

Connection是RabbitMQ的socket連結,它封裝了socket協議相關部分邏輯。

ConnectionFactory為Connection的製造工廠。

Channel是我們與RabbitMQ打交道的最重要的一個介面,我們大部分的業務操作是在Channel這個介面中完成的,包括定義Queue、定義Exchange、繫結Queue與Exchange、釋出訊息等。

Queue訊息佇列——內部物件

Queue(佇列)是RabbitMQ的內部物件,用於儲存訊息,用下圖表示。

RabbitMQ中的訊息都只能儲存在Queue中,生產者(下圖中的P)生產訊息並最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取訊息並消費。

多個消費者可以訂閱同一個Queue,這時Queue中的訊息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的訊息並處理。

 

  • Queue:為承載訊息的容器,為什麼是佇列而不是棧呢?主要是因為絕大部分的場景,我們希望訊息是先進先出,有順序的
  • Producer:生產者,就是產生訊息,並不斷往佇列塞的角色
  • Consumer:消費者,也就是不斷從佇列中獲取訊息的角色

 

exchange訊息分發策略(路由)

這裡聯絡到binding、bandingkey、routingkey、Exchange Type的基本概念

  • routing key

生產者在將訊息傳送給Exchange的時候,一般會指定一個routing key,來指定這個訊息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。

在Exchange Type與binding key固定的情況下(在正常使用時一般這些內容都是固定配置好的),我們的生產者就可以在傳送訊息給Exchange時,通過指定routing key來決定訊息流向哪裡。RabbitMQ為routing key設定的長度限制為255 bytes。

  • Binding

RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將訊息路由到指定的Queue了。

  • Binding key

在繫結(Binding)Exchange與Queue的同時,一般會指定一個binding key;消費者將訊息傳送給Exchange時,一般會指定一個routing key;當binding key與routing key相匹配時,訊息將會被路由到對應的Queue中。這個將在Exchange Types章節會列舉實際的例子加以說明。

在繫結多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。
binding key 並不是在所有情況下都生效,它依賴於Exchange Type,比如fanout型別的Exchange就會無視binding key,而是將訊息路由到所有繫結到該Exchange的Queue。

  • Exchange Type 四個策略

exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。

訊息分發策略 直接 路由 扇形
exchange策略 Direct Exchange Topic Exchange Fanout Exchange
特點和應用場景 直接完全匹配模式,適用於精準的訊息分發 Routing Key的匹配模式,支援Routing Key的模糊匹配方式,更適用於多類訊息的聚合 忽略Routing Key, 將訊息分配給所有的Queue,廣播模式,適用於訊息的複用場景
例如 如果繫結時設定了routing key為”abc”,那麼客戶端提交的訊息,只有設定了key為”abc”的才會投遞到佇列。 對key進行模式匹配後進行投遞,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def” 不需要key

1. Direct策略

IMAGE
訊息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將訊息發到對應的佇列中

簡單來講,就是路由鍵與佇列名完全匹配

  • 如果一個佇列繫結到交換機要求路由鍵為“dog”
  • 只轉發 routing key 標記為“dog”的訊息,
  • 不會轉發“dog.puppy”,也不會轉發“dog.guard”等等
  • 它是完全匹配、單播的模式

舉例說明

IMAGE

Exchange和兩個佇列繫結在一起: 
- Q1的bindingkey是orange 
- Q2的binding key是black和green. 
- 當Producer publish key是orange時, exchange會把它放到Q1上, 如果是black或green就會到Q2上, 其餘的Message被丟棄

2. Fanout策略

IMAGE
從上圖也可以看出,這種策略,將忽略所謂的routing key,將訊息分發到所有繫結的Queue上,更加類似我們理解的廣播模式

3. Topic策略

IMAGE
topic 交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要繫結到一個模式上

可以理解為直接策略的進階版,直接策略是完全精確匹配,而topic則支援正則匹配,滿足某類指定規則的(如以xxx開頭的路由鍵),可以鍵訊息分發過去

  • # 匹配0個或多個單詞
  • * 匹配不多不少一個單詞

一個更直觀的例項如下

IMAGE

Producer傳送訊息時需要設定routing_key,

  • Q1 的binding key 是”.orange.“
  • Q2 是 “..rabbit” 和 “lazy.#”:

產生一個 test.orange.mm 訊息,則會路由到Q1;而如果是 test.orange則無法路由到Q1,因為Q1的規則是三個單詞,中間一個為orange,不滿足這個規則的都無效
產生一個 test.qq.rabbit 或者 lazy.qq 都可以分發到Q2;即路由key為三個單詞,最後一個為rabbit或者不限制單詞個數,主要第一個是lazy的訊息,都可以分發過來
如果產生的是一個 test.orange.rabbit訊息,則Q1和Q2都可以滿足

4. Headers策略

headers型別的Exchange不依賴於routing key與binding key的匹配規則來路由訊息,而是根據傳送的訊息內容中的headers屬性進行匹配。

在繫結Queue與Exchange時指定一組鍵值對;當訊息傳送到Exchange時,RabbitMQ會取到該訊息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange繫結時指定的鍵值對;如果完全匹配則訊息會路由到該Queue,否則不會路由到該Queue。

這個實際上用得不多,簡單來說,它是根據Message的一些頭部資訊來分發過濾Message,忽略routing key的屬性,如果Header資訊和message訊息的頭資訊相匹配,就進行分發。

 

看一下RabbitMQ是怎麼具體來實現訊息佇列的

åé¨ç»æå¾

  • Message:訊息,包含訊息頭(即附屬的配置資訊)和訊息體(即訊息的實體內容)
  • Publisher:生產者,向交換機發布訊息的主體
  • Exchange:交換機,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列
  • Binding:繫結,用於給Exchange和Queue建立關係,就是我們熟知的配對的KEY
  • Queue:訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。
  • Connection:連線
  • Channel:通道,MQ與外部打交道都是通過Channel來的,釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過Channel完成;簡單來說就是訊息通過Channel塞進佇列或者流出佇列
  • Consumer:消費者,從訊息佇列中獲取訊息的主體
  • Virtual Host: 虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 /
  • Broker:訊息佇列伺服器實體

訊息佇列的使用過程大概如下:

(1)客戶端連線到訊息佇列伺服器,開啟一個channel。
(2)客戶端宣告一個exchange,並設定相關屬性。
(3)客戶端宣告一個queue,並設定相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立banding,繫結關係。
(5)客戶端投遞訊息到exchange。

 

引數設定 Message durability、Prefetch count、ACK

  • Message durability

如果我們希望即使在RabbitMQ服務重啟的情況下,也不會丟失訊息,我們可以將Queue與Message都設定為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ訊息不會丟失。但依然解決不了小概率丟失事件的發生(比如RabbitMQ伺服器已經接收到生產者的訊息,但還沒來得及持久化該訊息時RabbitMQ伺服器就斷電了),如果我們需要對這種小概率事件也要管理起來,那麼我們要用到事務。由於這裡僅為RabbitMQ的簡單介紹,所以這裡將不講解RabbitMQ相關的事務。

  • Prefetch count

前面我們講到如果有多個消費者同時訂閱同一個Queue中的訊息,Queue中的訊息會被平攤給多個消費者。這時如果每個訊息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作並一直空閒的情況。我們可以通過設定prefetchCount來限制Queue每次傳送給每個消費者的訊息數,比如我們設定prefetchCount=1,則Queue每次給每個消費者傳送一條訊息;消費者處理完這條訊息後Queue會再給該消費者傳送一條訊息。

  • Message acknowledgment——訊息佇列的重要指標ACK

在實際應用中,可能會發生消費者收到Queue中的訊息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致訊息丟失。為了避免這種情況發生,我們可以要求消費者在消費完訊息後傳送一個回執(ACK標誌)給RabbitMQ,如果返回true,則表示消費成功了,RabbitMQ收到訊息回執(Message acknowledgment)後才將該訊息從Queue中移除;如果返回false,且設定為重新入隊,這個訊息可以被重新投遞進來。如果RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ連線斷開,則RabbitMQ會將該訊息傳送給其他消費者(如果存在多個消費者)進行處理。

這裡不存在timeout概念,一個消費者處理訊息時間再長也不會導致該訊息被髮送給其他消費者,除非它的RabbitMQ連線斷開。

這裡會產生另外一個問題,如果我們的開發人員在處理完業務邏輯後,忘記傳送回執給RabbitMQ,這將會導致嚴重的bug——Queue中堆積的訊息會越來越多;消費者重啟後會重複消費這些訊息並重復執行業務邏輯…

通常實際編碼中,預設是自動ACK的,如果訊息的重要性程度較高,我們應該設定為主動ACK,在接收到訊息之後,自主的返回對應的ACK資訊。

另外pub message是沒有ack的。

RPC機制

MQ本身是基於非同步的訊息處理,前面的示例中所有的生產者(P)將訊息傳送到RabbitMQ後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條訊息都不知道)。

但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端將我的訊息處理完成後再進行下一步處理。這相當於RPC(Remote Procedure Call,遠端過程呼叫)。在RabbitMQ中也支援RPC。

RabbitMQ中實現RPC的機制是:

客戶端傳送請求(訊息)時,在訊息的屬性(MessageProperties,在AMQP協議中定義了14中properties,這些屬性會隨著訊息一起傳送)中設定兩個值replyTo(一個Queue名稱,用於告訴伺服器處理完成後將通知我的訊息傳送到這個Queue中)和correlationId(此次請求的標識號,伺服器處理完成後需要將此屬性返還,客戶端將根據這個id瞭解哪條請求被成功執行了或執行失敗);

伺服器端收到訊息並處理;

伺服器端處理完訊息後,將生成一條應答訊息到replyTo指定的Queue,同時帶上correlationId屬性;

客戶端之前已訂閱replyTo指定的Queue,從中收到伺服器的應答訊息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理。