1. 程式人生 > >RabbitMQ學習(3)- RabbitMQ結構

RabbitMQ學習(3)- RabbitMQ結構

目錄

RabbitMQ幾大元件

交換器型別

RabbitMQ執行流程

Connection與Channel

生產者執行流程

消費者執行流程

細說交換器


RabbitMQ幾大元件

  • 生產者:訊息建立者,將訊息傳送到訊息中介軟體的。
  • 訊息:包括有效載荷與標籤。有效載荷:要傳輸的資料;標籤:描述有效載荷的屬性;RabbitMQ通過標籤決定誰獲得該訊息,消費者只能得到有效載荷。
  • 消費者:是接收訊息的。
  • Brocker:是訊息中介軟體服務的節點。一個Brocker=一個RabbitMQ,一個伺服器上如果有多個RabbitMQ就有多個Brocker。
  • 佇列:是用來儲存訊息的。
  • 交換器:交換器僅僅對訊息進行轉發。交換器有不同的型別(如:fanout、direct、topic、headers)
  • 路由鍵:交換器與佇列的關係是通過繫結鍵(BindingKey,稱為路由鍵)建立的。
  • 繫結 

交換器型別

RabbitMQ常用的交換器型別有fanout、direct、topic、headers四種:

  • fanout:它會把所有傳送到該交換器的訊息,路由到所有與該交換器繫結的佇列中;
    public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();  
    factory.setUsername("rabbitstudy");
    factory.setPassword("123456");
    factory.setHost("192.168.0.1");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    		
    channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
    channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
    		
    channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "com.cdsn.test1"); 
    channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "com.cdsn.test2"); 
    
    		
    channel.basicPublish(EXCHANGE_NAME, "com.cdsn.test", 
    				MessageProperties.PERSISTENT_TEXT_PLAIN, 
    				"Hello World!".getBytes());
    channel.close();
    connection.close();
    }
  • direct:把訊息路由到那些BindingKey和RoutingKey完全匹配的佇列中;
  • topic:類似於direct,但可以使用萬用字元匹配規則;("#"可匹配多個或零個單詞,“*”可匹配單個單詞)
    public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();  
    factory.setUsername("rabbitstudy");
    factory.setPassword("123456");
    factory.setHost("192.168.0.1");
    factory.setPort(5672);
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    		
    channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
    channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
    		
    channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "*.test"); 
    channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "#.test"); 
    
    		
    channel.basicPublish(EXCHANGE_NAME, "com.cdsn.test", 
    				MessageProperties.PERSISTENT_TEXT_PLAIN, 
    				"Hello World!".getBytes());
    
    channel.close();
    connection.close();
    }
  • header:訊息不根據路由鍵的匹配規則路由,而是根據傳送的訊息內容中的headers屬性進行匹配。

RabbitMQ執行流程

  • 生產者傳送訊息
  1. 生產者與Broker建立連線(Connection),開啟通道(Channel)

  2. 生產者宣告交換器(交換器型別、是否持久化、是否自動刪除等)

  3. 生產者宣告佇列(是否持久化、是否排他、是否自動刪除)

  4. 生產者通過路由鍵將交換器和佇列繫結

  5. 生產者傳送訊息至Broker(攜帶路由鍵等)

  6. 交換器根據接收到的路由鍵,以及交換器型別查詢匹配的佇列

  7. 找到,將訊息存入相對應的佇列中;找不到,則根據生產者的配置,選擇丟棄還是退回給生產者

  8. 關閉通道、關閉連線

  • 消費者接收訊息
  1. 消費者與Broker建立連線(Connection),開啟通道(Channel)
  2. 消費者向Broker請求消費相應佇列的訊息,可能設定回撥函式
  3. 等待Broker迴應並投遞相應佇列中的訊息,接收訊息
  4. 消費者確認(ack)接收到的訊息
  5. RabbitMQ從佇列中刪除相應已經被確認的訊息
  6. 關閉通道、關閉連線

Connection與Channel

RabbitMQ所有的AMQP指令都是通過通道完成的。

RabbitMQ的Channel與Netty中的NIO模型區別在於Channel是建立在TCP連線之上的。


生產者執行流程

  1. 當生產者與Broker建立連線時,呼叫factory.newConnection()方法。
  2. 當客戶端呼叫channel.createChannel()方法時,準備開啟通道。
  3. 傳送訊息時,呼叫channel.basicPublish()方法。              

 


消費者執行流程

  1. 當客戶端呼叫channel.basicConsume()方法時,向Broker告知準備好消費訊息。
  2. 客戶端呼叫channel.basicAck()方法,向Broker傳送確認通知。

細說交換器

交換器

 

交換器

 

         引數                                        作用
exchange 交換器名稱
type 交換器的型別,常見如fanout、direct、topic、headers
durable 設定是否持久化
autoDelete 設定是否自動刪除
internal

設定是否是內建的

argument 其他結構化引數

備註:

持久化:交換器的建立持久化到磁碟;

自動刪除:自動刪除的前提是至少有一個佇列或者交換器與這個交換器繫結,之後所有與這個交換器繫結的佇列或者交換器都與此解綁;

channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, false, null);// 建立一個自動刪除的交換器

內建交換器:客戶端程式無法直接傳送訊息到交換器,只能通過交換器路由到內建交換器。


(1)不等待伺服器通知建立佇列成功命令:

    void channel.exchangeDeclareNoWait(String exchange,String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;

(2)檢測交換器是否存在,如果不存在則丟擲異常:

    Exchange.DeclareOK exchangeDeclarePassive(String name) throws IOException;// 如果要用到這方法,需要解決這個異常

(3)刪除交換器:

    ①  Exchange.DeleteOK exchangeDelete(String exchange) throws IOException;

    ②  void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;

    ③  Exchange.DeleteOK exchangeDelete(String exchange, boolean ifUnused) throws IOException;

備註:

        exchange標識交換器的名稱

        ifUnused用來設定是否在交換器沒有被使用的情況下刪除