1. 程式人生 > >RabbitMQ六種工作模式有哪些?怎樣用SpringBoot整合RabbitMQ

RabbitMQ六種工作模式有哪些?怎樣用SpringBoot整合RabbitMQ

目錄

  • 一、RabbitMQ入門程式
  • 二、Work queues 工作模式
  • 三、Publish / Subscribe 釋出/訂閱模式
  • 四、Routing 路由模式
  • 五、Topics
  • 六、Header
  • 七、RPC
  • 八、Spring Data Elasticsearch

一、RabbitMQ入門程式

<dependencies>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
      </dependency>

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-logging</artifactId>
      </dependency>
</dependencies>

application.yml

server:
  port: 44000
spring:
  application:
    name: test-rabbitmq-producer
  rabbitmq:
    username: guest
    password: guest
    host: 127.0.0.1
    port: 5672
    virtual-host: /

訊息傳送者

/**
 * Description: rabbitmq入門程式
 *
 * @author zygui
 * @date Created on 2020/5/13 15:34
 */
public class Producer01 {

    // 宣告一個訊息佇列名稱
    private static final String QUEUE_NAME = "helloworld";

    public static void main(String[] args) {
        // 通過連線工廠建立新的連線與mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/"); // 預設為 / 即可

        // 建立連線
        Connection connection = null;
        // 建立通道(目的是為了複用連線)
        Channel channel = null;
        try {

            //建立新連線
            connection = connectionFactory.newConnection();
            //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
            channel = connection.createChannel();

            //宣告佇列,如果佇列在mq 中沒有則要建立
            //引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 引數明細
             * 1、queue 佇列名稱
             * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
             * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
             * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
             * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
             */
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);

            // 傳送訊息
            //引數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 引數明細:
             * 1、exchange,交換機,如果不指定將使用mq的預設交換機(設定為"")
             * 2、routingKey,路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱
             * 3、props,訊息的屬性
             * 4、body,訊息內容
             */
            //訊息內容
            String message = "hello world 桂朝陽";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("send to mq "+message);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            try {
                // 關閉通道
                channel.close();
                // 關閉連線
                connection.close();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

訊息接收者

/**
 * Description: rabbitmq入門程式
 *
 * @author zygui
 * @date Created on 2020/5/13 15:45
 */
public class Consumer01 {

    private static final String QUEUE_NAME = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {

        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連線
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
        Channel channel = connection.createChannel();

        //監聽佇列
        //宣告佇列,如果佇列在mq 中沒有則要建立
        //引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 引數明細
         * 1、queue 佇列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
         * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
         * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
         * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        // 實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

            /**
             * 當接收到訊息後此方法將被呼叫
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽佇列
        //引數:String queue, boolean autoAck, Consumer callback
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
         * 3、callback,消費方法,當消費者接收到訊息要執行的方法
         */
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

入門程式步驟

二、Work queues 工作模式


三、Publish / Subscribe 釋出/訂閱模式


訊息生產者

public class Producer02_publish {
    //佇列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    // 交換機名稱
    private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";

    public static void main(String[] args) {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //建立新連線
            connection = connectionFactory.newConnection();
            //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
            channel = connection.createChannel();
            //宣告佇列,如果佇列在mq 中沒有則要建立
            //引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 引數明細
             * 1、queue 佇列名稱
             * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
             * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
             * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
             * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            //宣告一個交換機
            //引數:String exchange, String type
            /**
             * 引數明細:
             * 1、交換機的名稱
             * 2、交換機的型別
             * fanout:對應的rabbitmq的工作模式是 publish/subscribe
             * direct:對應的Routing	工作模式
             * topic:對應的Topics工作模式
             * headers: 對應的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);

            //進行交換機和佇列繫結
            //引數:String queue, String exchange, String routingKey
            /**
             * 引數明細:
             * 1、queue 佇列名稱
             * 2、exchange 交換機名稱
             * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
            //傳送訊息
            //引數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 引數明細:
             * 1、exchange,交換機,如果不指定將使用mq的預設交換機(設定為"")
             * 2、routingKey,路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱
             * 3、props,訊息的屬性
             * 4、body,訊息內容
             */
            for(int i=0;i<5;i++){
                //訊息內容
                String message = "send inform message to user";
                channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());
                System.out.println("send to mq "+message);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉連線
            //先關閉通道
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


    }
}

訊息接收者1

public class Consumer02_subscribe_email {
    //佇列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";


    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連線
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 引數明細
         * 1、queue 佇列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
         * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
         * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
         * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //宣告一個交換機
        //引數:String exchange, String type
        /**
         * 引數明細:
         * 1、交換機的名稱
         * 2、交換機的型別
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        //進行交換機和佇列繫結
        //引數:String queue, String exchange, String routingKey
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到訊息後此方法將被呼叫
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽佇列
        //引數:String queue, boolean autoAck, Consumer callback
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
         * 3、callback,消費方法,當消費者接收到訊息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}

訊息接收者2

public class Consumer02_subscribe_sms {
    //佇列名稱
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";


    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連線
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 引數明細
         * 1、queue 佇列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
         * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
         * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
         * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
        //宣告一個交換機
        //引數:String exchange, String type
        /**
         * 引數明細:
         * 1、交換機的名稱
         * 2、交換機的型別
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        //進行交換機和佇列繫結
        //引數:String queue, String exchange, String routingKey
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
         */
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到訊息後此方法將被呼叫
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽佇列
        //引數:String queue, boolean autoAck, Consumer callback
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
         * 3、callback,消費方法,當消費者接收到訊息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);

    }
}

四、Routing 路由模式


訊息生產者

public class Producer03_routing {
    //佇列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    // 交換機名稱
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
    // 路由鍵名稱
    private static final String ROUTINGKEY_EMAIL="inform_email";
    private static final String ROUTINGKEY_SMS="inform_sms";

    public static void main(String[] args) {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //建立新連線
            connection = connectionFactory.newConnection();
            //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
            channel = connection.createChannel();

            //宣告佇列,如果佇列在mq 中沒有則要建立
            //引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 引數明細
             * 1、queue 佇列名稱
             * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
             * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
             * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
             * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);

            //宣告一個交換機
            //引數:String exchange, String type
            /**
             * 引數明細:
             * 1、交換機的名稱
             * 2、交換機的型別
             * fanout:對應的rabbitmq的工作模式是 publish/subscribe
             * direct:對應的Routing	工作模式
             * topic:對應的Topics工作模式
             * headers: 對應的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

            //進行交換機和佇列繫結
            //引數:String queue, String exchange, String routingKey
            /**
             * 引數明細:
             * 1、queue 佇列名稱
             * 2、exchange 交換機名稱
             * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
            //channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
            //channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");

            //傳送訊息
            //引數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 引數明細:
             * 1、exchange,交換機,如果不指定將使用mq的預設交換機(設定為"")
             * 2、routingKey,路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱
             * 3、props,訊息的屬性
             * 4、body,訊息內容
             */
           /* for(int i=0;i<5;i++){
                //傳送訊息的時候指定routingKey
                String message = "send email inform message to user";
                channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());
                System.out.println("send to mq "+message);
            }*/
            for(int i=0;i<5;i++){
                //傳送訊息的時候指定routingKey
                String message = "send sms inform message to user";
                channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());
                System.out.println("send to mq "+message);
            }

           // 此時指定的路由鍵是 inform, 所以兩個消費者都可以消費
            /*for(int i=0;i<5;i++){
                //傳送訊息的時候指定routingKey
                String message = "send inform message to user";
                channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());
                System.out.println("send to mq "+message);
            }*/

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉連線
            //先關閉通道
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

訊息接收者1

public class Consumer03_routing_email {
    //佇列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    // 交換機名稱
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
    // 路由鍵名稱
    private static final String ROUTINGKEY_EMAIL="inform_email";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連線
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 引數明細
         * 1、queue 佇列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
         * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
         * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
         * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //宣告一個交換機
        //引數:String exchange, String type
        /**
         * 引數明細:
         * 1、交換機的名稱
         * 2、交換機的型別
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //進行交換機和佇列繫結
        //引數:String queue, String exchange, String routingKey
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到訊息後此方法將被呼叫
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽佇列
        //引數:String queue, boolean autoAck, Consumer callback
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
         * 3、callback,消費方法,當消費者接收到訊息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}
http://www.dtmao.cc/news_show_631033.shtml

訊息接收者2

public class Consumer03_routing_sms {
    //佇列名稱
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
    private static final String ROUTINGKEY_SMS="inform_sms";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連線
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 引數明細
         * 1、queue 佇列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
         * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
         * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
         * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
        //宣告一個交換機
        //引數:String exchange, String type
        /**
         * 引數明細:
         * 1、交換機的名稱
         * 2、交換機的型別
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //進行交換機和佇列繫結
        //引數:String queue, String exchange, String routingKey
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
         */
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到訊息後此方法將被呼叫
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽佇列
        //引數:String queue, boolean autoAck, Consumer callback
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
         * 3、callback,消費方法,當消費者接收到訊息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);

    }
}

五、Topics

public class Producer04_topics {
    // 佇列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    // 宣告交換機
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    // 使用萬用字元的方式來,設定路由鍵
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";
    public static void main(String[] args) {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //建立新連線
            connection = connectionFactory.newConnection();
            //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
            channel = connection.createChannel();
            //宣告佇列,如果佇列在mq 中沒有則要建立
            //引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 引數明細
             * 1、queue 佇列名稱
             * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
             * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
             * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
             * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            //宣告一個交換機
            //引數:String exchange, String type
            /**
             * 引數明細:
             * 1、交換機的名稱
             * 2、交換機的型別
             * fanout:對應的rabbitmq的工作模式是 publish/subscribe
             * direct:對應的Routing	工作模式
             * topic:對應的Topics工作模式
             * headers: 對應的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
            //進行交換機和佇列繫結
            //引數:String queue, String exchange, String routingKey
            /**
             * 引數明細:
             * 1、queue 佇列名稱
             * 2、exchange 交換機名稱
             * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
            //傳送訊息
            //引數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 引數明細:
             * 1、exchange,交換機,如果不指定將使用mq的預設交換機(設定為"")
             * 2、routingKey,路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱
             * 3、props,訊息的屬性
             * 4、body,訊息內容
             */
            for(int i=0;i<5;i++){
                //傳送訊息的時候指定routingKey
                String message = "send email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //傳送訊息的時候指定routingKey
                String message = "send sms inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //傳送訊息的時候指定routingKey
                String message = "send sms and email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉連線
            //先關閉通道
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

訊息接收者1

public class Consumer04_topics_email {
    //佇列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連線
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 引數明細
         * 1、queue 佇列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
         * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
         * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
         * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //宣告一個交換機
        //引數:String exchange, String type
        /**
         * 引數明細:
         * 1、交換機的名稱
         * 2、交換機的型別
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //進行交換機和佇列繫結
        //引數:String queue, String exchange, String routingKey
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到訊息後此方法將被呼叫
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽佇列
        //引數:String queue, boolean autoAck, Consumer callback
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
         * 3、callback,消費方法,當消費者接收到訊息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}

小寫接收者2

public class Consumer04_topics_sms {
    //佇列名稱
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//埠
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連線
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 引數明細
         * 1、queue 佇列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
         * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
         * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
         * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
        //宣告一個交換機
        //引數:String exchange, String type
        /**
         * 引數明細:
         * 1、交換機的名稱
         * 2、交換機的型別
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //進行交換機和佇列繫結
        //引數:String queue, String exchange, String routingKey
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
         */
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到訊息後此方法將被呼叫
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽佇列
        //引數:String queue, boolean autoAck, Consumer callback
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
         * 3、callback,消費方法,當消費者接收到訊息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);

    }
}

六、Header

七、RPC

八、Spring Data Elasticsearch

rabbitmq-producer 訊息傳送者

@Configuration
public class RabbitMQConfig {
    // 宣告兩個佇列常量
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    // 宣告交換機常量
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    // 宣告兩個路由鍵常量
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //宣告交換機
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        //durable(true) 持久化,mq重啟之後交換機還在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    // 宣告佇列

    //宣告QUEUE_INFORM_EMAIL佇列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //宣告QUEUE_INFORM_SMS佇列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    // 繫結交換機和佇列
    //ROUTINGKEY_EMAIL佇列繫結交換機,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS佇列繫結交換機,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                          @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
}

rabbitmq-consumer 訊息接收者

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //宣告交換機
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        //durable(true) 持久化,mq重啟之後交換機還在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    // 宣告佇列

    //宣告QUEUE_INFORM_EMAIL佇列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //宣告QUEUE_INFORM_SMS佇列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    // 繫結交換機和佇列
    //ROUTINGKEY_EMAIL佇列繫結交換機,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS佇列繫結交換機,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                          @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
}
@SpringBootApplication
@EnableRabbit
public class TestRabbitMQApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestRabbitMQApplication.class, args);
    }
}

監聽訊息佇列

@Component
public class ReceiveHandler {

    @RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_EMAIL})
    public void receiveMsg(String msg) {
        System.out.println("接收到的訊息是 = " + msg);
    }
}

在rabbitmq-provider中測試

@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer05_topics_springboot {

    // 使用rabbitTemplate傳送訊息
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendEmail() {
        String message = "send email message to user";
        /**
         * arg1: 交換機名稱
         * arg2: 路由鍵
         * arg3: 訊息內容
         */
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message);
    }

}