1. 程式人生 > >Rabbitmq交換器Exchange和訊息佇列

Rabbitmq交換器Exchange和訊息佇列

通常我們談到佇列服務, 會有三個概念: 發訊息者、佇列、收訊息者,RabbitMQ 在這個基本概念之上, 多做了一層抽象, 在發訊息者和 佇列之間, 加入了交換器 (Exchange). 這樣發訊息者和佇列就沒有直接聯絡, 轉而變成發訊息者把訊息給交換器, 交換器根據排程策略再把訊息再給佇列。

交換器的功能主要是接收訊息並且轉發到繫結的佇列,交換器不儲存訊息,在啟用ack模式後,交換器找不到佇列會返回錯誤。交換器有四種類型:Direct, topic, Headers and Fanout

  • Direct:direct 型別的行為是"先匹配, 再投送". 即在繫結時設定一個 routing_key, 訊息的routing_key匹配時, 才會被交換器投送到繫結的佇列中去.
  • Topic:按規則轉發訊息(最靈活)
  • Headers:設定header attribute引數型別的交換器
  • Fanout:轉發訊息到所有繫結佇列

Direct Exchange
Direct Exchange是RabbitMQ預設的交換器模式,也是最簡單的模式,根據key全文匹配去尋找佇列。

第一個 X - Q1 就有一個 binding key,名字為 orange; X - Q2 就有 2 個 binding key,名字為 black 和 green。當訊息中的 路由鍵 和 這個 binding key 對應上的時候,那麼就知道了該訊息去到哪一個佇列中。

Topic Exchange

Topic Exchange 轉發訊息主要是根據萬用字元。 在這種交換器下,佇列和交換器的繫結會定義一種路由模式,那麼,萬用字元就要在這種路由模式和路由鍵之間匹配後交換器才能轉發訊息。

在這種交換器模式下:

  • 路由鍵必須是一串字元,用句號(.) 隔開,比如說 agreements.us,或者 agreements.eu.stockholm 等。
  • 路由模式必須包含一個 星號(*),主要用於匹配路由鍵指定位置的一個單詞,比如說,一個路由模式是這樣子:agreements..b.*,那麼就只能匹配路由鍵是這樣子的:第一個單詞是 agreements,第四個單詞是 b。 井號(#)就表示相當於一個或者多個單詞,例如一個匹配模式是agreements.eu.berlin.#,那麼,以agreements.eu.berlin開頭的路由鍵都是可以的。

具體程式碼傳送的時候還是一樣,第一個引數表示交換器,第二個引數表示routing key,第三個引數即訊息。如下:

rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

topic 和 direct 類似, 只是匹配上支援了"模式", 在"點分"的 routing_key 形式中, 可以使用兩個萬用字元:

  • *表示一個詞.
  • #表示零個或多個詞.

Headers Exchange

headers 也是根據規則匹配, 相較於 direct 和 topic 固定地使用 routing_key , headers 則是一個自定義匹配規則的型別.
在佇列與交換器繫結時, 會設定一組鍵值對規則, 訊息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或全部匹配時, 訊息被投送到對應佇列.

Fanout Exchange

Fanout Exchange 訊息廣播的模式,不管路由鍵或者是路由模式,會把訊息發給繫結給它的全部佇列,如果配置了routing_key會被忽略。

 

 在Rabbit MQ中,無論是生產者傳送訊息還是消費者接受訊息,都首先需要宣告一個MessageQueue。這就存在一個問題,是生產者宣告還是消費者宣告呢?要解決這個問題,首先需要明確:

a)消費者是無法訂閱或者獲取不存在的MessageQueue中資訊。

b)訊息被Exchange接受以後,如果沒有匹配的Queue,則會被丟棄。

在明白了上述兩點以後,就容易理解如果是消費者去宣告Queue,就有可能會出現在宣告Queue之前,生產者已傳送的訊息被丟棄的隱患。如果應用能夠通過訊息重發的機制允許訊息丟失,則使用此方案沒有任何問題。但是如果不能接受該方案,這就需要無論是生產者還是消費者,在傳送或者接受訊息前,都需要去嘗試建立訊息佇列。這裡有一點需要明確,如果客戶端嘗試建立一個已經存在的訊息佇列,Rabbit MQ不會做任何事情,並返回客戶端建立成功的。

       如果一個消費者在一個通道中正在監聽某一個佇列的訊息,Rabbit MQ是不允許該消費者在同一個channel去宣告其他佇列的。Rabbit MQ中,可以通過queue.declare命令宣告一個佇列,可以設定該佇列以下屬性:

a) Exclusive:排他佇列,如果一個佇列被宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除。這裡需要注意三點:其一,排他佇列是基於連線可見的,同一連線的不同通道是可以同時訪問同一個連線建立的排他佇列的。其二,“首次”,如果一個連線已經聲明瞭一個排他佇列,其他連線是不允許建立同名的排他佇列的,這個與普通佇列不同。其三,即使該佇列是持久化的,一旦連線關閉或者客戶端退出,該排他佇列都會被自動刪除的。這種佇列適用於只限於一個客戶端傳送讀取訊息的應用場景。

b)   Auto-delete:自動刪除,如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動刪除。這種佇列適用於臨時佇列。

 c)   Durable:持久化,這個會在後面作為專門一個章節討論。

d)  其他選項,例如如果使用者僅僅想查詢某一個佇列是否已存在,如果不存在,不想建立該佇列,仍然可以呼叫queue.declare,只不過需要將引數passive設為true,傳給queue.declare,如果該佇列已存在,則會返回true;如果不存在,則會返回Error,但是不會建立新的佇列。