1. 程式人生 > >RabbitMQ學習筆記三:Exchange的學習(1)

RabbitMQ學習筆記三:Exchange的學習(1)

一、概述

    上一篇文章中講述了一個簡單的訊息傳遞模型,訊息從生產者傳送到消費者再發送到佇列,實際的工作中生產者不知道要把訊息傳送給哪個佇列,可能有多個消費者要生產者的訊息,也可能有的消費者不需要生產者的全部訊息,比如日誌系統,一個消費者需要info級別的資訊,另一個消費者需要error和info級別的資訊,這時候我們就用到了交換機,生產者把訊息傳送到交換機,交換機像是一個訊息中轉站,一邊接收生產者的訊息,一邊將訊息根據繫結佇列的路由規則傳送給指定的佇列,這種我們稱作“釋出/訂閱”模式。

二、交換機

    這是這篇文章的主題,在學習筆記一種有說到,一共有四種交換機,fanout(分發),topic(匹配),direct(直連),header(主題),這篇文章中,我們先探討fanout型別的交換機。

    我們可以先從管理臺上看一下RabbitMQ為我們提供的交換機,如下圖:

    
     圖上所有amq.*的交換機,都是RabbitMQ為我們建立的交換機。

    匿名交換機:這裡有個概念要說明一下,在上篇文章中我們傳送訊息時並沒有使用交換機,但是訊息依然傳送到了指定的佇列,這是因為RabbitMQ為我們建立了一個“”空字串的匿名交換機,我們看下上篇文章中的傳送訊息的程式碼

channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));

    我們在Channel類中檢視一下這個函式的宣告引數型別及含義  

    /**
     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

    可以看到第一個引數就是交換機,第二個引數是路由規則,當時寫的時候我還好奇,為什麼路由規則這個引數我們傳參傳的是佇列的名稱。原來是因為RabbitMQ為我們建立了一個“”空字串的匿名交換機,此時,訊息預設傳送到路由規則同名的佇列上,這也就是上篇文章中訊息能夠成功傳送到我們指定佇列上的原因。

三、臨時佇列

    上一篇文章中我們定義了一個靜態的成員變數QUEUE_NAME,指定了佇列的名字為HelloMq,RabbitMQ為我們建立了這個佇列。實際工作中,我們可能不需要指定佇列的名字,或者說我們不需要使用已經存在的佇列,這時候我們就可以使用RabbitMQ建立的臨時佇列。每當我們連線到RabbitMQ時,都建立一個新的空佇列,並且讓RabbitMQ隨機選擇一個名字給我們,並且在所有消費者斷開的時候,佇列自動刪除。     
String queueName = channel.queueDeclare().getQueue();

    JAVA中使用queueDeclare().getQueue()方法建立一個隨機的非持久化佇列。

四、路由規則

    前邊我們說到交換機根據路由規則傳送訊息到指定佇列,這裡的功能實現是佇列與交換機之間事先綁定了一個路由規則,當訊息傳送過來的時候,交換機根據路由規則匹配到相應的佇列上。佇列與交換機之間繫結路由規則程式碼如下: 

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");

    第一個引數是佇列的名稱,第二個引數是交換機的名稱,第三個引數是要繫結的路由規則,可以根據不同的佇列定義不同的繫結規則而實現不同的訊息傳送到不同的佇列上。當然,本文中學習到的fanout型別的交換機會自動忽視繫結規則而將訊息傳送到所有與交換機繫結的佇列上去。

五、原始碼

    本示例採用文章開頭的配圖結構,一個生產者,一個交換機,兩個隨機臨時佇列,兩個消費者。

    先寫訊息生產者,由訊息生產者建立一個名為“HelloMq”的fanout型別交換機,我們可以看到傳送訊息的方法第一個引數變為交換機的名稱了,由於交換機型別為fanout,會忽略繫結規則,因此第二個引數為空。

package com.cn.chenxyt.mq;

import java.io.IOException;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;


public class MqProducer {
	 public final static String EXCHANGE_NAME="HelloMq";
	 public static void main(String[] args) throws IOException, InterruptedException {
		 //建立連線工廠
		 ConnectionFactory factory = new ConnectionFactory();
		 //設定主機、使用者名稱、密碼和客戶端埠號
		 factory.setHost("localhost");
		 factory.setUsername("guest");
		 factory.setPassword("guest");
		 factory.setPort(5672);
		 //建立一個新的連線 即TCP連線
		 Connection connection = factory.newConnection();
		 //建立一個通道
		 Channel channel = connection.createChannel();
		 //建立一個交換機
		 channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
		 String message = "Hello World";
		 while (true){
			 //傳送訊息
			 channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
	         System.out.println("Producer Send +'" + message + "!");
	         Thread.sleep(2000);
		 }
	}
}

    接下來我們建立消費者1,消費者1聲明瞭一個隨機臨時佇列,並繫結到了交換機上,如前邊所述,交換機型別為fanout,因此繫結規則為空。

package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer1 {
	 public final static String EXCHANGE_NAME="HelloMq";
	 public static void main(String[] args) throws IOException {
		 //建立連線工廠
		 ConnectionFactory factory = new ConnectionFactory();
		 //設定主機
		 factory.setHost("localhost");
		 //建立一個新的連線 即TCP連線
		 Connection connection = factory.newConnection();
		 //建立一個通道
		 Channel channel = connection.createChannel();
		 //建立隨機佇列
		 String queueName = channel.queueDeclare().getQueue();
		 //繫結佇列到交換機
		 channel.queueBind(queueName, EXCHANGE_NAME, "");
	     System.out.println("Consumer1 Waiting Received messages");
	     //DefaultConsumer類實現了Consumer介面,通過傳入一個channel,
	     //告訴伺服器我們需要哪個channel的訊息並監聽channel,如果channel中有訊息,就會執行回撥函式handleDelivery
	     Consumer consumer = new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope,
	                                       BasicProperties properties, byte[] body)
	                    throws IOException {
	                String message = new String(body, "UTF-8");
	                System.out.println("Consumer1 Received '" + message + "'");
	            }
	        };
	        //自動回覆佇列應答 -- RabbitMQ中的訊息確認機制
	        channel.basicConsume(queueName, true, consumer);
	}
}

    接下來建立消費者2,消費者2與消費者1相同,各自建立了一個隨機臨時佇列並繫結在交換機上,為了便於區分,列印日誌處區分了名字。

package com.cn.chenxyt.mq;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer2 {
	 public final static String EXCHANGE_NAME="HelloMq";
	 public static void main(String[] args) throws IOException {
		 //建立連線工廠
		 ConnectionFactory factory = new ConnectionFactory();
		 //設定主機
		 factory.setHost("localhost");
		 //建立一個新的連線 即TCP連線
		 Connection connection = factory.newConnection();
		 //建立一個通道
		 Channel channel = connection.createChannel();
		 //建立隨機佇列
		 String queueName = channel.queueDeclare().getQueue();
		 //繫結佇列到交換機
		 channel.queueBind(queueName, EXCHANGE_NAME, "");
	     System.out.println("Consumer2 Waiting Received messages");
	     //DefaultConsumer類實現了Consumer介面,通過傳入一個channel,
	     //告訴伺服器我們需要哪個channel的訊息並監聽channel,如果channel中有訊息,就會執行回撥函式handleDelivery
	     Consumer consumer = new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope,
	                                       BasicProperties properties, byte[] body)
	                    throws IOException {
	                String message = new String(body, "UTF-8");
	                System.out.println("Consumer2 Received '" + message + "'");
	            }
	        };
	        //自動回覆佇列應答 -- RabbitMQ中的訊息確認機制
	        channel.basicConsume(queueName, true, consumer);
	}
}


    接下來我們分別啟動消費者1和消費者2,使他們處在監聽狀態。

   

        同時我們在RabbitMQ的管理臺可以看到新建了兩條名字隨機由RabbitMQ生成的佇列,並且當消費者1和消費者2服務斷開時,這兩個佇列會被刪除

    

    接下來我們啟動訊息生產者,每2s傳送一條訊息

    

    同時可以看到消費者1和消費者2都收到了相同的訊息,並且RabbitMQ管理臺新建了一個名為“HelloMq”型別為"fanout"的交換機 

    

        

    

     如上所示,我們就完成了使用fanout型別的交換機進行訊息的“釋出/訂閱”,總結一下,我們可以使用fanout型別的交換機進行訊息的廣播發送。

六、程式碼下載