1. 程式人生 > >訊息中介軟體——RabbitMQ(六)理解Exchange交換機核心概念!

訊息中介軟體——RabbitMQ(六)理解Exchange交換機核心概念!

前言

來了解RabbitMQ一個重要的概念:Exchange交換機

1. Exchange概念

  • Exchange:接收訊息,並根據路由鍵轉發訊息所繫結的佇列。

藍色框:客戶端傳送訊息至交換機,通過路由鍵路由至指定的佇列。
黃色框:交換機和佇列通過路由鍵有一個繫結的關係。
綠色框:消費端通過監聽佇列來接收訊息。

2. 交換機屬性

Name:交換機名稱
Type:交換機型別——direct、topic、fanout、headers、sharding(此篇不講)
Durability:是否需要持久化,true為持久化
Auto Delete:當最後一個繫結到Exchange上的佇列刪除後,自動刪除該Exchange
Internal

:當前Exchange是否用於RabbitMQ內部使用,預設為false
Arguments:擴充套件引數,用於擴充套件AMQP協議自定製化使用

3. Direct Exchange(直連)

  • 所有傳送到Direct Exchange的訊息被轉發到RouteKey中指定的Queue

注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進行任何繫結(binding)操作,訊息傳遞時,RouteKey必須完全匹配才會被佇列接收,否則該訊息會被拋棄。

重點:routing key與佇列queues 的key保持一致,即可以路由到對應的queue中。

3.1 程式碼演示

生產端:


/**
 * 
* @ClassName: Producer4DirectExchange 
* @Description: 生產者
* @author Coder程式設計
* @date2019年7月19日 下午22:15:52 
*
 */
public class Producer4DirectExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1建立ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        //2建立Channel
        Channel channel = connection.createChannel();  
        //3 宣告
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        //4 傳送
        String msg = "Coder程式設計 Hello World RabbitMQ 4  Direct Exchange Message ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());         
    }
}

消費端:


/**
 * 
* @ClassName: Consumer4DirectExchange 
* @Description: 消費者
* @author Coder程式設計
* @date2019年7月19日 下午22:18:52 
*
 */
public class Consumer4DirectExchange {

    public static void main(String[] args) throws Exception {
        
        
        //建立ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();  
        //宣告
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        
        //表示聲明瞭一個交換機
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示聲明瞭一個佇列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一個繫結關係:
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化訊息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //引數:佇列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //迴圈獲取訊息  
        while(true){  
            //獲取訊息,如果沒有訊息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到訊息:" + msg);  
        } 
    }
}

測試結果:

注意需要routingKey保持一致。可以自己嘗試修改routingkey,是否能收到訊息。

4. Topic Exchange

  • 所有傳送到Topic Exchange的訊息被轉發到所有管線RouteKey中指定Topic的Queue上
  • Exchange將RouteKey和某Topic進行模糊匹配,此時佇列需要繫結一個Topic

    注意:可以使用萬用字元進行模糊匹配
    符號 "#" 匹配一個或多個詞
    符號 "" 匹配不多不少一個詞
    例如:"log.#" 能夠匹配到 "log.info.oa"
    "log.
    " 只會匹配到 "log.error"

在一堆訊息中,每個不同的佇列只關心自己需要的訊息。

4.1 程式碼演示

生產端:


/**
 * 
* @ClassName: Producer4TopicExchange 
* @Description: 生產者
* @author Coder程式設計
* @date2019年7月19日 下午22:32:41
*
 */
public class Producer4TopicExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1建立ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        //2建立Channel
        Channel channel = connection.createChannel();  
        //3宣告
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //4傳送
        String msg = "Coder程式設計 Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());    
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
        channel.close();  
        connection.close();  
    }
}

消費端:


/**
 * 
* @ClassName: Consumer4TopicExchange 
* @Description: 消費者
* @author Coder程式設計
* @date2019年7月19日 下午22:37:12
*
 */
public class Consumer4TopicExchange {

    public static void main(String[] args) throws Exception {
        
        
        //建立ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();  
        // 宣告
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        //String routingKey = "user.*";
        String routingKey = "user.*";
        // 1 宣告交換機 
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        // 2 宣告佇列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交換機和佇列的繫結關係:
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化訊息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //引數:佇列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer);  
        //迴圈獲取訊息  
        while(true){  
            //獲取訊息,如果沒有訊息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到訊息:" + msg);  
        } 
    }
}

測試結果:

注意一個問題:需要進行解綁

5. Fanout Exchange

  • 不處理路由鍵,只需要簡單的將隊裡繫結到交換機上
  • 傳送到交換機的訊息都會被轉發到與該交換機繫結的所有佇列上
  • Fanout交換機轉發訊息是最快的

5.1 程式碼演示

生產端:



/**
 * 
* @ClassName: Producer4FanoutExchange 
* @Description: 生產者
* @author Coder程式設計
* @date2019年7月19日 下午23:01:16
*
 */
public class Producer4FanoutExchange {

    
    public static void main(String[] args) throws Exception {
        
        //1建立ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        //2 建立Channel
        Channel channel = connection.createChannel();  
        //3 宣告
        String exchangeName = "test_fanout_exchange";
        //4 傳送
        for(int i = 0; i < 10; i ++) {
            String msg = "Coder 程式設計  Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());          
        }
        channel.close();  
        connection.close();  
    }
    
}

消費端:


/**
 * 
* @ClassName: Consumer4FanoutExchange 
* @Description: 消費者
* @author Coder程式設計
* @date2019年7月19日 下午23:21:18
*
 */
public class Consumer4FanoutExchange {

    public static void main(String[] args) throws Exception {
        
        //建立ConnectionFactory
        Connection connection = ConnectionUtils.getConnection();
        
        Channel channel = connection.createChannel();  
        // 宣告
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = ""; //不設定路由鍵
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //durable 是否持久化訊息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //引數:佇列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, true, consumer); 
        //迴圈獲取訊息  
        while(true){  
            //獲取訊息,如果沒有訊息,這一步將會一直阻塞  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("收到訊息:" + msg);  
        } 
    }
}

測試結果:


6. 其他

6.1 Bingding —— 繫結

  • Exchange和Exchange、Queue之間的連線關係
  • Bingding可以包含RoutingKey或者引數

6.2 Queue——訊息佇列

  • 訊息佇列,實際儲存訊息資料
  • Durability:是否持久化,Durable:是 ,Transient:否
  • Auto delete:如選yes,代表當最後一個監聽被移除之後,該Queue會自動被刪除。

6.3 Message——訊息

  • 伺服器與應用程式之間傳送的資料
  • 本質上就是一段資料,由Properties和Payload(Body)組成
  • 常用屬性:delivery mode、headers(自定義屬性)

6.4 其他屬性

content_type、content_encoding、priority

correlation_id、reply_to、expiration、message_id

timestamp、type、user_id、app_id、cluster_id

6.5 Virtual Host虛擬主機

  • 虛擬地址,用於進行邏輯隔離,最上層的訊息路由
  • 一個Virtual Host裡面可以有若干個Exchange和Queue
  • 同一個Virtual Host裡面不能有相同名稱的Exchange或Queue

7. 總結

RabbitMQ的概念、安裝與使用、管控臺操作、結合RabbitMQ的特性、Exchange、Queue、Binding
、RoutingKey、Message進行核銷API的講解,通過本章的學習,希望大家對RabbitMQ有一個初步的認識。

文末

歡迎關注個人微信公眾號:Coder程式設計
獲取最新原創技術文章和免費學習資料,更有大量精品思維導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識!
新建了一個qq群:315211365,歡迎大家進群交流一起學習。謝謝了!也可以介紹給身邊有需要的朋友。

文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關注並star~

參考文章:

《RabbitMQ訊息中介軟體精講》

推薦文章:

訊息中介軟體——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協議!

訊息中介軟體——RabbitMQ(四)命令列與管控臺的基本操作!

訊息中介軟體——RabbitMQ(五)快速入門生產者與消費者,SpringBoot整合RabbitMQ