1. 程式人生 > >Springboot+RabbitMQ整合示例

Springboot+RabbitMQ整合示例

一、RabbitMQ簡介

         MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式通過 佇列來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求。

1、MQ特點: MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。MQ和JMS類似,但不同的是JMS是SUN JAVA

訊息中介軟體服務的一個標準和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。

2、含義:RabbitMQ是一個在AMQP基礎上完成的,可複用的企業訊息系統。他遵循Mozilla Public License開源協議

3、概念:RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQ是AMQP(高階訊息佇列協議)的標準實現。如果不熟悉AMQP,直接看RabbitMQ的文件會比較困難。不過它也只有幾個關鍵概念,這裡簡單介紹。

RabbitMQ的結構圖如下:

Broker:簡單來說就是訊息佇列伺服器實體。

  •   Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
  •   Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
  •   Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
  •   Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
  •   vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
  •   producer:訊息生產者,就是投遞訊息的程式。
  •   consumer:訊息消費者,就是接受訊息的程式。
  •   channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。

訊息佇列的使用過程大概如下:

  •        客戶端連線到訊息佇列伺服器,開啟一個channel。
  •   客戶端宣告一個exchange,並設定相關屬性。
  •   客戶端宣告一個queue,並設定相關屬性。
  •   客戶端使用routing key,在exchange和queue之間建立好繫結關係。
  •   客戶端投遞訊息到exchange。

      exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。

      exchange也有幾個型別,完全根據key進行投遞的叫做Direct交換機,例如,繫結時設定了routing key為”abc”,那麼客戶端提交的訊息,只有設定了key為”abc”的才會投遞到佇列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它採取廣播模式,一個訊息進來時,投遞到與該交換機繫結的所有佇列。

       RabbitMQ支援訊息的持久化,也就是資料寫在磁碟上,為了資料安全考慮,我想大多數使用者都會選擇持久化。訊息佇列持久化包括3個部分:

  •   exchange持久化,在宣告時指定durable => 1
  •   queue持久化,在宣告時指定durable => 1
  •   訊息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

        如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立繫結。

二、Springboot整合RabbitMQ

1、新增pom.xml依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example.demo.rabbitmq</groupId>
	<artifactId>rabbitmq-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>rabbitmq-demo</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.5.RELEASE</version>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

2、application.yml配置檔案主要是對rabbimq的配置資訊

server:
  port: 8081

spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: 192.168.0.132
    port: 5672
    username: admin
    password: admin

3、初始化建立佇列、轉發器,並把佇列繫結到轉發器(RabbitConfig.java)

package com.example.demo.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 路徑:com.example.demo.rabbitmq.config
 * 類名:
 * 功能:佇列配置
 * 備註:
 * 建立人:typ
 * 建立時間:2018/9/23 21:46
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Configuration
public class RabbitConfig {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }

    @Bean
    public Queue userQueue() {
        return new Queue("user");
    }

    //===============以下是驗證topic Exchange的佇列==========
    @Bean
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
    //===============以上是驗證topic Exchange的佇列==========


    //===============以下是驗證Fanout Exchange的佇列==========
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    //===============以上是驗證Fanout Exchange的佇列==========


    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 將佇列topic.message與exchange繫結,binding_key為topic.message,就是完全匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 將佇列topic.messages與exchange繫結,binding_key為topic.#,模糊匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
}

4、最簡單的hello生產和消費實現(單生產者和單消費者)

生產者:

package com.example.demo.rabbitmq.service.oneToOne;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 路徑:com.example.demo.rabbitmq.service
 * 類名:
 * 功能:生產者
 * 備註:單生產者-單消費者
 * 建立人:typ
 * 建立時間:2018/9/23 21:49
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class HelloSender {

    private static final Logger log = LoggerFactory.getLogger(HelloSender.class);

    @Autowired
    public AmqpTemplate amqpTemplate;

    public void send(){
        String context = "hello " + new Date();
        log.info("Sender:" + context);
        this.amqpTemplate.convertAndSend("hello",context);
    }
}

消費者:

package com.example.demo.rabbitmq.service.oneToOne;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service
 * 類名:
 * 功能:消費者
 * 備註:單生產者-單消費者
 * 建立人:typ
 * 建立時間:2018/9/23 22:14
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class HelloReceiver {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiver.class);

    //監聽器監聽指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("Receiver:"+hello);

    }

}

controller測試:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.oneToOne.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路徑:com.example.demo.rabbitmq.controller
 * 類名:
 * 功能:《用一句描述一下》
 * 備註:單生產者-單消費者
 * 建立人:typ
 * 建立時間:2018/9/23 22:35
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@RestController
public class RabbitOneToOneTest {

    @Autowired
    private HelloSender helloSender;

    @PostMapping("/hello")
    public void hello(){
        helloSender.send();
    }
}

啟動程式,執行:

結果如下:

Sender : hello1 Thu September 24 17:23:31 CST 2018
Receiver  : hello1 Thu September 24 17:23:31 CST 2018

5、單生產者-多消費者

生產者:

package com.example.demo.rabbitmq.service.oneToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 路徑:com.example.demo.rabbitmq.service
 * 類名:
 * 功能:生產者
 * 備註:單生產者-多消費者
 * 建立人:typ
 * 建立時間:2018/9/23 21:49
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class HelloSender1 {

    private static final Logger log = LoggerFactory.getLogger(HelloSender1.class);

    @Autowired
    public AmqpTemplate amqpTemplate;

    public void send(String msg){
        String context = msg + new Date();
        log.info("Sender1:" + context);
        this.amqpTemplate.convertAndSend("hello",context);
    }
}

消費者1:

package com.example.demo.rabbitmq.service.oneToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service
 * 類名:
 * 功能:消費者1
 * 備註:單生產者-多消費者
 * 建立人:typ
 * 建立時間:2018/9/23 22:14
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class HelloReceiver1 {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiver1.class);

    //監聽器監聽指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("Receiver1:"+hello);
    }

}

消費者2:

package com.example.demo.rabbitmq.service.oneToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service
 * 類名:
 * 功能:消費者2
 * 備註:單生產者-多消費者
 * 建立人:typ
 * 建立時間:2018/9/23 22:14
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class HelloReceiver2 {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiver2.class);

    //監聽器監聽指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("Receiver2:"+hello);

    }

}

controller測試:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.oneToMany.HelloSender1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路徑:com.example.demo.rabbitmq.controller
 * 類名:
 * 功能:《用一句描述一下》
 * 備註:單生產者-多消費者
 * 建立人:typ
 * 建立時間:2018/9/23 22:35
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@RestController
public class RabbitOneToManyTest {

    @Autowired
    private HelloSender1 helloSender;
    
    /**
     * 方法名:
     * 功能:單生產者-多消費者
     * 描述:
     * 建立人:typ
     * 建立時間:2018/9/23 22:46
     * 修改人:
     * 修改描述:
     * 修改時間:
     */
    @PostMapping("/oneToMany")
    public void ontToMany(){
        for (int i=0;i<10;i++){
            helloSender.send("hello smg:"+i);
        }
    }
}

6、多生產者-多消費者

生產者1:

package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 路徑:com.example.demo.rabbitmq.service
 * 類名:
 * 功能:生產者1
 * 備註:多生產者-多消費者
 * 建立人:typ
 * 建立時間:2018/9/23 21:49
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class HelloSenderA {

    private static final Logger log = LoggerFactory.getLogger(HelloSenderA.class);

    @Autowired
    public AmqpTemplate amqpTemplate;

    public void send(String msg){
        String context = msg + new Date();
        log.info("SenderA:" + context);
        this.amqpTemplate.convertAndSend("hello",context);
    }
}

生產者2:

package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 路徑:com.example.demo.rabbitmq.service
 * 類名:
 * 功能:生產者2
 * 備註:多生產者-多消費者
 * 建立人:typ
 * 建立時間:2018/9/23 21:49
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class HelloSenderB {

    private static final Logger log = LoggerFactory.getLogger(HelloSenderB.class);

    @Autowired
    public AmqpTemplate amqpTemplate;

    public void send(String msg){
        String context = msg + new Date();
        log.info("SenderB:" + context);
        this.amqpTemplate.convertAndSend("hello",context);
    }
}

消費者1:

package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service
 * 類名:
 * 功能:消費者1
 * 備註:多生產者-多消費者
 * 建立人:typ
 * 建立時間:2018/9/23 22:14
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class HelloReceiverA {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiverA.class);

    //監聽器監聽指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("ReceiverA:"+hello);
    }

}

消費者2:

package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service
 * 類名:
 * 功能:消費者2
 * 備註:多生產者-多消費者
 * 建立人:typ
 * 建立時間:2018/9/23 22:14
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class HelloReceiverB {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiverB.class);

    //監聽器監聽指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("ReceiverB:"+hello);
    }

}

controller測試:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.manyToMany.HelloSenderA;
import com.example.demo.rabbitmq.service.manyToMany.HelloSenderB;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路徑:com.example.demo.rabbitmq.controller
 * 類名:
 * 功能:《用一句描述一下》
 * 備註:多生產者-多消費者
 * 建立人:typ
 * 建立時間:2018/9/23 22:35
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@RestController
public class RabbitManyToManyTest {

    @Autowired
    private HelloSenderA helloSenderA;

    @Autowired
    private HelloSenderB helloSenderB;

    /**
     * 方法名:
     * 功能:多生產者-多消費者
     * 描述:
     * 建立人:typ
     * 建立時間:2018/9/23 22:46
     * 修改人:
     * 修改描述:
     * 修改時間:
     */
    @PostMapping("/manyToMany")
    public void ontToMany(){
        for (int i=0;i<10;i++){
            helloSenderA.send("hello smg:"+i);
            helloSenderB.send("hello smg:"+i);
        }
    }
}

7、實體類傳輸,springboot完美的支援物件的傳送和接收,不需要格外的配置。

實體類(必須實現序列化介面):

package com.example.demo.rabbitmq.service.entity;

import java.io.Serializable;

/**
 * 路徑:com.example.demo.rabbitmq.service.entity
 * 類名:
 * 功能:《用一句描述一下》
 * 備註:
 * 建立人:typ
 * 建立時間:2018/9/24 19:59
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
public class User implements Serializable{

    private String name;
    private String pass;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPass() {
        return pass;
    }

    public void setPass(String pass) {
        this.pass = pass;
    }
}

生產者:

package com.example.demo.rabbitmq.service.entity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.entity
 * 類名:
 * 功能:實體類傳輸
 * 備註:生產者
 * 建立人:typ
 * 建立時間:2018/9/24 20:01
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class UserSender {

    private static final Logger log = LoggerFactory.getLogger(UserSender.class);

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {
        User user = new User();
        user.setName("test");
        user.setPass("123456");
        log.info("user Sender:" + user.getName() + "," + user.getPass());
        amqpTemplate.convertAndSend("user", user);
    }
}

消費者:

package com.example.demo.rabbitmq.service.entity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.entity
 * 類名:
 * 功能:實體類傳輸
 * 備註:消費者
 * 建立人:typ
 * 建立時間:2018/9/24 20:07
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
@RabbitListener(queues = "user")
public class UserReceiver {

    private static final Logger log = LoggerFactory.getLogger(UserReceiver.class);

    @RabbitHandler
    public void process(User user) {
        log.info("user Receive:" + user.getName() + "," + user.getPass());
    }
}

controller測試:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.entity.UserSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路徑:com.example.demo.rabbitmq.controller
 * 類名:
 * 功能:實體類傳輸測試
 * 備註:
 * 建立人:typ
 * 建立時間:2018/9/24 20:09
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@RestController
public class RabbitUserTest {

    @Autowired
    private UserSender userSender;

    @PostMapping("/userTest")
    public void userTets(){
        userSender.send();
    }
}

8、topic ExChange示例

     topic 是RabbitMQ中最靈活的一種方式,可以根據binding_key自由的繫結不同的佇列。首先對topic規則配置,這裡使用兩個佇列來測試(也就是在Application類中建立和繫結的topic.message和topic.messages兩個佇列),其中topic.message的bindting_key為“topic.message”,topic.messages的binding_key為“topic.#”。

生產者:

package com.example.demo.rabbitmq.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.topic
 * 類名:
 * 功能:topic ExChange示例------生產者
 * 備註:topic 是RabbitMQ中最靈活的一種方式,可以根據binding_key自由的繫結不同的佇列
 * 建立人:typ
 * 建立時間:2018/9/24 20:12
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class TopicSender {

    private static final Logger log = LoggerFactory.getLogger(TopicSender.class);

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String msg1 = "I am topic.mesaage msg1!";
        log.info("sender1 : " + msg1);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);

        String msg2 = "I am topic.mesaages msg2!";
        log.info("sender2 : " + msg2);
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
    }
}

消費者1:

package com.example.demo.rabbitmq.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.topic
 * 類名:
 * 功能:topic ExChange示例
 * 備註:消費者1(topic.message)
 * 建立人:typ
 * 建立時間:2018/9/24 20:12
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {

    private static final Logger log = LoggerFactory.getLogger(TopicReceiver1.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("topicReceiver1: " +msg);
    }

}

消費者2:

package com.example.demo.rabbitmq.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.topic
 * 類名:
 * 功能:topic ExChange示例
 * 備註:消費者2(topic.messages)
 * 建立人:typ
 * 建立時間:2018/9/24 20:12
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

    private static final Logger log = LoggerFactory.getLogger(TopicReceiver2.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("topicReceiver2 : " +msg);
    }

}

controller測試:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.topic.TopicSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路徑:com.example.demo.rabbitmq.controller
 * 類名:
 * 功能:topic ExChange示例
 * 備註:
 * 建立人:typ
 * 建立時間:2018/9/24 20:21
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@RestController
public class RabbitTopicTest {

    @Autowired
    private TopicSender topicSender;

    @PostMapping("/topicTest")
    public void topicTest(){
        topicSender.send();
    }
}

9、fanout ExChange示例

Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout轉發器傳送訊息,綁定了這個轉發器的所有佇列都收到這個訊息。

生產者:

package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.fanout
 * 類名:
 * 功能:fanout ExChange示例---生產者
 * 備註:Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout轉發器傳送訊息,綁定了這個轉發器的所有佇列都收到這個訊息。
 * 建立人:typ
 * 建立時間:2018/9/24 21:10
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class FanoutSender {

    private static final Logger log = LoggerFactory.getLogger(FanoutSender.class);

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String msg="fanoutSender :hello i am fanout";
        log.info(msg);
        this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msg);
    }
}

消費者1:

package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.fanout
 * 類名:
 * 功能:fanout ExChange示例
 * 備註:消費者A
 * 建立人:typ
 * 建立時間:2018/9/24 21:10
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    private static final Logger log = LoggerFactory.getLogger(FanoutReceiverA.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("FanoutReceiverA  : " + msg);
    }

}

消費者2:

package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.fanout
 * 類名:
 * 功能:fanout ExChange示例
 * 備註:消費者B
 * 建立人:typ
 * 建立時間:2018/9/24 21:10
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    private static final Logger log = LoggerFactory.getLogger(FanoutReceiverB.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("FanoutReceiverB  : " + msg);
    }

}

消費者3:

package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.fanout
 * 類名:
 * 功能:fanout ExChange示例
 * 備註:消費者C
 * 建立人:typ
 * 建立時間:2018/9/24 21:10
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    private static final Logger log = LoggerFactory.getLogger(FanoutReceiverC.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("FanoutReceiverC  : " + msg);
    }

}

controller測試:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.fanout.FanoutSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路徑:com.example.demo.rabbitmq.controller
 * 類名:
 * 功能:fanout ExChange示例
 * 備註:
 * 建立人:typ
 * 建立時間:2018/9/24 22:11
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@RestController
public class RabbitFanoutTest {

    @Autowired
    private FanoutSender fanoutSender;

    @PostMapping("/fanoutTest")
    public void fanoutTest() {
        fanoutSender.send();
    }
}

10、callback的訊息傳送

       增加回調處理,這裡不再使用application.properties預設配置的方式,會在程式中顯示的使用檔案中的配置資訊。

rabbitmq配置類:

package com.example.demo.rabbitmq.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;

/**
 * 路徑:com.example.demo.rabbitmq.service.callback
 * 類名:
 * 功能:增加回調處理,這裡不再使用application.properties預設配置的方式,會在程式中顯示的使用檔案中的配置資訊。
 * 備註:
 * 建立人:typ
 * 建立時間:2018/9/24 20:09
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
public class RabbitConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class);
    
    @Value("${spring.rabbitmq.host}")
    private String addresses;
    
    @Value("${spring.rabbitmq.port}")
    private String port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.publisher-confirms}")
    private boolean publisherConfirms;
    
    @Bean
    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses+":"+port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);

        //如果要進行訊息回撥,則這裡必須要設定為true
        connectionFactory.setPublisherConfirms(publisherConfirms);
        return connectionFactory;
    }

    //因為要設定回撥類,所以應是prototype型別,如果是singleton型別,則回撥類為最後一次設定
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplatenew() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

}

生產者:

package com.example.demo.rabbitmq.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * 路徑:com.example.demo.rabbitmq.service.callback
 * 類名:CallBackSender
 * 功能:callback的訊息傳送-----生產者
 * 建立人:typ
 * 建立時間:2018/9/24 20:09
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
public class CallBackSender implements  RabbitTemplate.ConfirmCallback{

    private static final Logger log = LoggerFactory.getLogger(CallBackSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplatenew;

    public void send() {
        rabbitTemplatenew.setConfirmCallback(this);
        String msg="callbackSender : i am callback sender";
        log.info(msg);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("callbackSender UUID: " + correlationData.getId());
        this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);  
    }

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("callbakck confirm: " + correlationData.getId());
    }
}

消費者:

package com.example.demo.rabbitmq.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路徑:com.example.demo.rabbitmq.service.callback
 * 類名:
 * 功能:callback的訊息傳送
 * 備註:消費者
 * 建立人:typ
 * 建立時間:2018/9/24 20:12
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@Component
@RabbitListener(queues = "topic.messages")
public class CallBackReceiver {

    private static final Logger log = LoggerFactory.getLogger(CallBackReceiver.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("CallBackReceiver : " +msg);
    }

}

controller測試:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.callback.CallBackSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路徑:com.example.demo.rabbitmq.controller
 * 類名:
 * 功能:callback的訊息傳送
 * 備註:
 * 建立人:typ
 * 建立時間:2018/9/24 22:20
 * 修改人:
 * 修改備註:
 * 修改時間:
 */
@RestController
public class RabbitCallBackTest {

    @Autowired
    private CallBackSender callBackSender;

    //執行程式碼可以看出callbackSender發出的UUID,收到了迴應,又傳回來了。
    @PostMapping("/callback")
    public void callbak() {
        callBackSender.send();
    }
}