訊息中介軟體——RabbitMQ(六)理解Exchange交換機核心概念!
前言
來了解RabbitMQ一個重要的概念:Exchange交換機
1. Exchange概念
- Exchange:接收訊息,並根據路由鍵轉發訊息所繫結的佇列。
藍色框:客戶端傳送訊息至交換機,通過路由鍵路由至指定的佇列。
黃色框:交換機和佇列通過路由鍵有一個繫結的關係。
綠色框:消費端通過監聽佇列來接收訊息。
2. 交換機屬性
Name
:交換機名稱
Type
:交換機型別——direct、topic、fanout、headers、sharding(此篇不講)
Durability
:是否需要持久化,true為持久化
Auto Delete
:當最後一個繫結到Exchange上的佇列刪除後,自動刪除該Exchange
Internal
Arguments
:擴充套件引數,用於擴充套件AMQP協議自定製化使用
3. Direct Exchange(直連)
- 所有傳送到Direct Exchange的訊息被轉發到RouteKey中指定的Queue
注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進行任何繫結(binding)操作,訊息傳遞時,RouteKey必須完全匹配才會被佇列接收,否則該訊息會被拋棄。
重點:routing key與佇列queues 的key保持一致,即可以路由到對應的queue中。
3.1 程式碼演示
生產端:
/** * * @ClassName: Producer4DirectExchange * @Description: 生產者 * @author Coder程式設計 * @date2019年7月19日 下午22:15:52 * */ public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2建立Channel Channel channel = connection.createChannel(); //3 宣告 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //4 傳送 String msg = "Coder程式設計 Hello World RabbitMQ 4 Direct Exchange Message ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
消費端:
/**
*
* @ClassName: Consumer4DirectExchange
* @Description: 消費者
* @author Coder程式設計
* @date2019年7月19日 下午22:18:52
*
*/
public class Consumer4DirectExchange {
public static void main(String[] args) throws Exception {
//建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//宣告
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//表示聲明瞭一個交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
//表示聲明瞭一個佇列
channel.queueDeclare(queueName, false, false, false, null);
//建立一個繫結關係:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化訊息
QueueingConsumer consumer = new QueueingConsumer(channel);
//引數:佇列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//迴圈獲取訊息
while(true){
//獲取訊息,如果沒有訊息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:" + msg);
}
}
}
測試結果:
注意需要routingKey保持一致。可以自己嘗試修改routingkey,是否能收到訊息。
4. Topic Exchange
- 所有傳送到Topic Exchange的訊息被轉發到所有管線RouteKey中指定Topic的Queue上
Exchange將RouteKey和某Topic進行模糊匹配,此時佇列需要繫結一個Topic
注意:可以使用萬用字元進行模糊匹配
符號 "#" 匹配一個或多個詞
符號 "" 匹配不多不少一個詞
例如:"log.#" 能夠匹配到 "log.info.oa"
"log." 只會匹配到 "log.error"
在一堆訊息中,每個不同的佇列只關心自己需要的訊息。
4.1 程式碼演示
生產端:
/**
*
* @ClassName: Producer4TopicExchange
* @Description: 生產者
* @author Coder程式設計
* @date2019年7月19日 下午22:32:41
*
*/
public class Producer4TopicExchange {
public static void main(String[] args) throws Exception {
//1建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
//2建立Channel
Channel channel = connection.createChannel();
//3宣告
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//4傳送
String msg = "Coder程式設計 Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
}
}
消費端:
/**
*
* @ClassName: Consumer4TopicExchange
* @Description: 消費者
* @author Coder程式設計
* @date2019年7月19日 下午22:37:12
*
*/
public class Consumer4TopicExchange {
public static void main(String[] args) throws Exception {
//建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 宣告
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
//String routingKey = "user.*";
String routingKey = "user.*";
// 1 宣告交換機
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 2 宣告佇列
channel.queueDeclare(queueName, false, false, false, null);
// 3 建立交換機和佇列的繫結關係:
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化訊息
QueueingConsumer consumer = new QueueingConsumer(channel);
//引數:佇列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//迴圈獲取訊息
while(true){
//獲取訊息,如果沒有訊息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:" + msg);
}
}
}
測試結果:
注意一個問題:需要進行解綁
5. Fanout Exchange
- 不處理路由鍵,只需要簡單的將隊裡繫結到交換機上
- 傳送到交換機的訊息都會被轉發到與該交換機繫結的所有佇列上
- Fanout交換機轉發訊息是最快的
5.1 程式碼演示
生產端:
/**
*
* @ClassName: Producer4FanoutExchange
* @Description: 生產者
* @author Coder程式設計
* @date2019年7月19日 下午23:01:16
*
*/
public class Producer4FanoutExchange {
public static void main(String[] args) throws Exception {
//1建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
//2 建立Channel
Channel channel = connection.createChannel();
//3 宣告
String exchangeName = "test_fanout_exchange";
//4 傳送
for(int i = 0; i < 10; i ++) {
String msg = "Coder 程式設計 Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, "", null , msg.getBytes());
}
channel.close();
connection.close();
}
}
消費端:
/**
*
* @ClassName: Consumer4FanoutExchange
* @Description: 消費者
* @author Coder程式設計
* @date2019年7月19日 下午23:21:18
*
*/
public class Consumer4FanoutExchange {
public static void main(String[] args) throws Exception {
//建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 宣告
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = ""; //不設定路由鍵
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//durable 是否持久化訊息
QueueingConsumer consumer = new QueueingConsumer(channel);
//引數:佇列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//迴圈獲取訊息
while(true){
//獲取訊息,如果沒有訊息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:" + msg);
}
}
}
測試結果:
6. 其他
6.1 Bingding —— 繫結
- Exchange和Exchange、Queue之間的連線關係
- Bingding可以包含RoutingKey或者引數
6.2 Queue——訊息佇列
- 訊息佇列,實際儲存訊息資料
- Durability:是否持久化,Durable:是 ,Transient:否
- Auto delete:如選yes,代表當最後一個監聽被移除之後,該Queue會自動被刪除。
6.3 Message——訊息
- 伺服器與應用程式之間傳送的資料
- 本質上就是一段資料,由Properties和Payload(Body)組成
- 常用屬性:delivery mode、headers(自定義屬性)
6.4 其他屬性
content_type、content_encoding、priority
correlation_id、reply_to、expiration、message_id
timestamp、type、user_id、app_id、cluster_id
6.5 Virtual Host虛擬主機
- 虛擬地址,用於進行邏輯隔離,最上層的訊息路由
- 一個Virtual Host裡面可以有若干個Exchange和Queue
- 同一個Virtual Host裡面不能有相同名稱的Exchange或Queue
7. 總結
RabbitMQ的概念、安裝與使用、管控臺操作、結合RabbitMQ的特性、Exchange、Queue、Binding
、RoutingKey、Message進行核銷API的講解,通過本章的學習,希望大家對RabbitMQ有一個初步的認識。
文末
歡迎關注個人微信公眾號:Coder程式設計
獲取最新原創技術文章和免費學習資料,更有大量精品思維導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識!
新建了一個qq群:315211365,歡迎大家進群交流一起學習。謝謝了!也可以介紹給身邊有需要的朋友。
文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關注並star~
參考文章:
《RabbitMQ訊息中介軟體精講》
推薦文章:
訊息中介軟體——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協議!
訊息中介軟體——RabbitMQ(四)命令列與管控臺的基本操作!
訊息中介軟體——RabbitMQ(五)快速入門生產者與消費者,SpringBoot整合RabbitMQ