1. 程式人生 > >學習之路-RabbitMQ(三):RabbitMQ的工作模式

學習之路-RabbitMQ(三):RabbitMQ的工作模式

RabbitMQ有以下幾種工作模式 :
1、Work queues 工作佇列
2、Publish/Subscribe 釋出訂閱模式
3、Routing 路由模式
4、Topics 萬用字元模式
5、Header
6、RPC

一:Work queues 工作佇列
在這裡插入圖片描述
work queues與入門程式相比,多了一個消費端,兩個消費端共同消費同一個佇列中的訊息。 應用場景:對於 任務過重或任務較多情況使用工作佇列可以提高任務處理的速度。
程式碼:工作佇列模式
測試:
1、使用入門程式,啟動多個消費者。
2、生產者傳送多個訊息。
結果:
1、一條訊息只會被一個消費者接收;
2、rabbit採用輪詢的方式將訊息是平均傳送給消費者的;
3、消費者在處理完某條訊息後,才會收到下一條訊息。
二:Publish/Subscribe 釋出訂閱模式


在這裡插入圖片描述

釋出訂閱模式:
1、每個消費者監聽自己的佇列。
2、生產者將訊息發給broker,由交換機將訊息轉發到繫結此交換機的每個佇列,每個繫結交換機的佇列都將接收 到訊息

程式碼:Publish/Subscribe 釋出訂閱模式

1、publish/subscribe與work queues有什麼區別。
區別:
1)work queues不用定義交換機,而publish/subscribe需要定義交換機。
2)publish/subscribe的生產方是面向交換機發送訊息,work queues的生產方是面向佇列傳送訊息(底層使用預設 交換機)。
3)publish/subscribe需要設定佇列和交換機的繫結,work queues不需要設定,實質上work queues會將佇列綁 定到預設的交換機 。
相同點: 所以兩者實現的釋出/訂閱的效果是一樣的,多個消費端監聽同一個佇列不會重複消費訊息。
2、實質工作用什麼 publish/subscribe還是work queues。
建議使用 publish/subscribe,釋出訂閱模式比工作佇列模式更強大,並且釋出訂閱模式可以指定自己專用的交換
機。
三:Routing 路由模式


路由工作模式
路由模式:
1、每個消費者監聽自己的佇列,並且設定routingkey。
2、生產者將訊息發給交換機,由交換機根據routingkey來轉發訊息到指定的佇列。

程式碼:Routing 路由模式

1、Routing模式和Publish/subscibe有啥區別?
Routing模式要求佇列在繫結交換機時要指定routingkey,訊息會轉發到符合routingkey的佇列。

四:Topics 萬用字元模式
路由模式:
1、每個消費者監聽自己的佇列,並且設定帶統配符的routingkey。
2、生產者將訊息發給broker,由交換機根據routingkey來轉發訊息到指定的佇列。
在這裡插入圖片描述
案例:
根據使用者的通知設定去通知使用者,設定接收Email的使用者只接收Email,設定接收sms的使用者只接收sms,設定兩種 通知型別都接收的則兩種通知都有效。
程式碼:

Topics 萬用字元模式
Topics與Routing的區別:
Topics與Routing的基本原理相同,即生產者將訊息發給交換機,由交換機根據routingkey來轉發訊息到指定的佇列。
不同之處是:routingKey的匹配方式不相同
Routing模式是相等匹配,Topics是萬用字元匹配
符號 # :匹配一個或者是多個詞,比如:inform.#可以匹配inform.sms,inform.email,inform.sms.email
符號 :只能匹配一個詞,比如inform. 可以匹配inform.sms,inform.email
四:Header模式
header模式與routing不同的地方在於,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配
佇列。
案例:根據使用者的通知設定去通知使用者,設定接收Email的使用者只接收Email,設定接收sms的使用者只接收sms,設定兩種 通知型別都接收的則兩種通知都有效。
程式碼:
1)生產者 佇列與交換機繫結的程式碼與之前不同,如下:

Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);

通知:

String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");
//匹配email通知消費者繫結的header 
headers.put("inform_type", "sms");
//匹配sms通知消費者繫結的header 
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

傳送郵件消費者:

channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>(); 
headers_email.put("inform_email", "email");
//交換機和佇列繫結 
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 
//指定消費佇列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);  

六:RPC遠端呼叫模式
在這裡插入圖片描述
RPC即客戶端遠端呼叫服務端的方法 ,使用MQ可以實現RPC的非同步呼叫,基於Direct交換機實現,流程如下:
1、客戶端即是生產者就是消費者,向RPC請求佇列傳送RPC呼叫訊息,同時監聽RPC響應佇列。
2、服務端監聽RPC請求佇列的訊息,收到訊息後執行服務端的方法,得到方法返回的結果
3、服務端將RPC方法 的結果傳送到RPC響應佇列
4、客戶端(RPC呼叫方)監聽RPC響應佇列,接收到RPC呼叫結果。