Springboot+RabbitMQ整合示例
一、RabbitMQ簡介
MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式通過 佇列來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求。
1、MQ特點: MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。MQ和JMS類似,但不同的是JMS是SUN JAVA
2、含義:RabbitMQ是一個在AMQP基礎上完成的,可複用的企業訊息系統。他遵循Mozilla Public License開源協議。
3、概念:RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQ是AMQP(高階訊息佇列協議)的標準實現。如果不熟悉AMQP,直接看RabbitMQ的文件會比較困難。不過它也只有幾個關鍵概念,這裡簡單介紹。
RabbitMQ的結構圖如下:
Broker:簡單來說就是訊息佇列伺服器實體。
- Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
- Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
- Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
- Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
- vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
- producer:訊息生產者,就是投遞訊息的程式。
- consumer:訊息消費者,就是接受訊息的程式。
- channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。
訊息佇列的使用過程大概如下:
- 客戶端連線到訊息佇列伺服器,開啟一個channel。
- 客戶端宣告一個exchange,並設定相關屬性。
- 客戶端宣告一個queue,並設定相關屬性。
- 客戶端使用routing key,在exchange和queue之間建立好繫結關係。
- 客戶端投遞訊息到exchange。
exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。
exchange也有幾個型別,完全根據key進行投遞的叫做Direct交換機,例如,繫結時設定了routing key為”abc”,那麼客戶端提交的訊息,只有設定了key為”abc”的才會投遞到佇列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它採取廣播模式,一個訊息進來時,投遞到與該交換機繫結的所有佇列。
RabbitMQ支援訊息的持久化,也就是資料寫在磁碟上,為了資料安全考慮,我想大多數使用者都會選擇持久化。訊息佇列持久化包括3個部分:
- exchange持久化,在宣告時指定durable => 1
- queue持久化,在宣告時指定durable => 1
- 訊息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立繫結。
二、Springboot整合RabbitMQ
1、新增pom.xml依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.demo.rabbitmq</groupId>
<artifactId>rabbitmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitmq-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、application.yml配置檔案主要是對rabbimq的配置資訊
server:
port: 8081
spring:
application:
name: rabbitmq-demo
rabbitmq:
host: 192.168.0.132
port: 5672
username: admin
password: admin
3、初始化建立佇列、轉發器,並把佇列繫結到轉發器(RabbitConfig.java)
package com.example.demo.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 路徑:com.example.demo.rabbitmq.config
* 類名:
* 功能:佇列配置
* 備註:
* 建立人:typ
* 建立時間:2018/9/23 21:46
* 修改人:
* 修改備註:
* 修改時間:
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
@Bean
public Queue userQueue() {
return new Queue("user");
}
//===============以下是驗證topic Exchange的佇列==========
@Bean
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean
public Queue queueMessages() {
return new Queue("topic.messages");
}
//===============以上是驗證topic Exchange的佇列==========
//===============以下是驗證Fanout Exchange的佇列==========
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
//===============以上是驗證Fanout Exchange的佇列==========
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
* 將佇列topic.message與exchange繫結,binding_key為topic.message,就是完全匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
/**
* 將佇列topic.messages與exchange繫結,binding_key為topic.#,模糊匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
4、最簡單的hello生產和消費實現(單生產者和單消費者)
生產者:
package com.example.demo.rabbitmq.service.oneToOne;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 路徑:com.example.demo.rabbitmq.service
* 類名:
* 功能:生產者
* 備註:單生產者-單消費者
* 建立人:typ
* 建立時間:2018/9/23 21:49
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class HelloSender {
private static final Logger log = LoggerFactory.getLogger(HelloSender.class);
@Autowired
public AmqpTemplate amqpTemplate;
public void send(){
String context = "hello " + new Date();
log.info("Sender:" + context);
this.amqpTemplate.convertAndSend("hello",context);
}
}
消費者:
package com.example.demo.rabbitmq.service.oneToOne;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service
* 類名:
* 功能:消費者
* 備註:單生產者-單消費者
* 建立人:typ
* 建立時間:2018/9/23 22:14
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class HelloReceiver {
private static final Logger log = LoggerFactory.getLogger(HelloReceiver.class);
//監聽器監聽指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("Receiver:"+hello);
}
}
controller測試:
package com.example.demo.rabbitmq.controller;
import com.example.demo.rabbitmq.service.oneToOne.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 路徑:com.example.demo.rabbitmq.controller
* 類名:
* 功能:《用一句描述一下》
* 備註:單生產者-單消費者
* 建立人:typ
* 建立時間:2018/9/23 22:35
* 修改人:
* 修改備註:
* 修改時間:
*/
@RestController
public class RabbitOneToOneTest {
@Autowired
private HelloSender helloSender;
@PostMapping("/hello")
public void hello(){
helloSender.send();
}
}
啟動程式,執行:
結果如下:
Sender : hello1 Thu September 24 17:23:31 CST 2018
Receiver : hello1 Thu September 24 17:23:31 CST 2018
5、單生產者-多消費者
生產者:
package com.example.demo.rabbitmq.service.oneToMany;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 路徑:com.example.demo.rabbitmq.service
* 類名:
* 功能:生產者
* 備註:單生產者-多消費者
* 建立人:typ
* 建立時間:2018/9/23 21:49
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class HelloSender1 {
private static final Logger log = LoggerFactory.getLogger(HelloSender1.class);
@Autowired
public AmqpTemplate amqpTemplate;
public void send(String msg){
String context = msg + new Date();
log.info("Sender1:" + context);
this.amqpTemplate.convertAndSend("hello",context);
}
}
消費者1:
package com.example.demo.rabbitmq.service.oneToMany;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service
* 類名:
* 功能:消費者1
* 備註:單生產者-多消費者
* 建立人:typ
* 建立時間:2018/9/23 22:14
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class HelloReceiver1 {
private static final Logger log = LoggerFactory.getLogger(HelloReceiver1.class);
//監聽器監聽指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("Receiver1:"+hello);
}
}
消費者2:
package com.example.demo.rabbitmq.service.oneToMany;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service
* 類名:
* 功能:消費者2
* 備註:單生產者-多消費者
* 建立人:typ
* 建立時間:2018/9/23 22:14
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class HelloReceiver2 {
private static final Logger log = LoggerFactory.getLogger(HelloReceiver2.class);
//監聽器監聽指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("Receiver2:"+hello);
}
}
controller測試:
package com.example.demo.rabbitmq.controller;
import com.example.demo.rabbitmq.service.oneToMany.HelloSender1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 路徑:com.example.demo.rabbitmq.controller
* 類名:
* 功能:《用一句描述一下》
* 備註:單生產者-多消費者
* 建立人:typ
* 建立時間:2018/9/23 22:35
* 修改人:
* 修改備註:
* 修改時間:
*/
@RestController
public class RabbitOneToManyTest {
@Autowired
private HelloSender1 helloSender;
/**
* 方法名:
* 功能:單生產者-多消費者
* 描述:
* 建立人:typ
* 建立時間:2018/9/23 22:46
* 修改人:
* 修改描述:
* 修改時間:
*/
@PostMapping("/oneToMany")
public void ontToMany(){
for (int i=0;i<10;i++){
helloSender.send("hello smg:"+i);
}
}
}
6、多生產者-多消費者
生產者1:
package com.example.demo.rabbitmq.service.manyToMany;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 路徑:com.example.demo.rabbitmq.service
* 類名:
* 功能:生產者1
* 備註:多生產者-多消費者
* 建立人:typ
* 建立時間:2018/9/23 21:49
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class HelloSenderA {
private static final Logger log = LoggerFactory.getLogger(HelloSenderA.class);
@Autowired
public AmqpTemplate amqpTemplate;
public void send(String msg){
String context = msg + new Date();
log.info("SenderA:" + context);
this.amqpTemplate.convertAndSend("hello",context);
}
}
生產者2:
package com.example.demo.rabbitmq.service.manyToMany;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 路徑:com.example.demo.rabbitmq.service
* 類名:
* 功能:生產者2
* 備註:多生產者-多消費者
* 建立人:typ
* 建立時間:2018/9/23 21:49
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class HelloSenderB {
private static final Logger log = LoggerFactory.getLogger(HelloSenderB.class);
@Autowired
public AmqpTemplate amqpTemplate;
public void send(String msg){
String context = msg + new Date();
log.info("SenderB:" + context);
this.amqpTemplate.convertAndSend("hello",context);
}
}
消費者1:
package com.example.demo.rabbitmq.service.manyToMany;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service
* 類名:
* 功能:消費者1
* 備註:多生產者-多消費者
* 建立人:typ
* 建立時間:2018/9/23 22:14
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class HelloReceiverA {
private static final Logger log = LoggerFactory.getLogger(HelloReceiverA.class);
//監聽器監聽指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("ReceiverA:"+hello);
}
}
消費者2:
package com.example.demo.rabbitmq.service.manyToMany;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service
* 類名:
* 功能:消費者2
* 備註:多生產者-多消費者
* 建立人:typ
* 建立時間:2018/9/23 22:14
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class HelloReceiverB {
private static final Logger log = LoggerFactory.getLogger(HelloReceiverB.class);
//監聽器監聽指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("ReceiverB:"+hello);
}
}
controller測試:
package com.example.demo.rabbitmq.controller;
import com.example.demo.rabbitmq.service.manyToMany.HelloSenderA;
import com.example.demo.rabbitmq.service.manyToMany.HelloSenderB;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 路徑:com.example.demo.rabbitmq.controller
* 類名:
* 功能:《用一句描述一下》
* 備註:多生產者-多消費者
* 建立人:typ
* 建立時間:2018/9/23 22:35
* 修改人:
* 修改備註:
* 修改時間:
*/
@RestController
public class RabbitManyToManyTest {
@Autowired
private HelloSenderA helloSenderA;
@Autowired
private HelloSenderB helloSenderB;
/**
* 方法名:
* 功能:多生產者-多消費者
* 描述:
* 建立人:typ
* 建立時間:2018/9/23 22:46
* 修改人:
* 修改描述:
* 修改時間:
*/
@PostMapping("/manyToMany")
public void ontToMany(){
for (int i=0;i<10;i++){
helloSenderA.send("hello smg:"+i);
helloSenderB.send("hello smg:"+i);
}
}
}
7、實體類傳輸,springboot完美的支援物件的傳送和接收,不需要格外的配置。
實體類(必須實現序列化介面):
package com.example.demo.rabbitmq.service.entity;
import java.io.Serializable;
/**
* 路徑:com.example.demo.rabbitmq.service.entity
* 類名:
* 功能:《用一句描述一下》
* 備註:
* 建立人:typ
* 建立時間:2018/9/24 19:59
* 修改人:
* 修改備註:
* 修改時間:
*/
public class User implements Serializable{
private String name;
private String pass;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
}
生產者:
package com.example.demo.rabbitmq.service.entity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.entity
* 類名:
* 功能:實體類傳輸
* 備註:生產者
* 建立人:typ
* 建立時間:2018/9/24 20:01
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class UserSender {
private static final Logger log = LoggerFactory.getLogger(UserSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
User user = new User();
user.setName("test");
user.setPass("123456");
log.info("user Sender:" + user.getName() + "," + user.getPass());
amqpTemplate.convertAndSend("user", user);
}
}
消費者:
package com.example.demo.rabbitmq.service.entity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.entity
* 類名:
* 功能:實體類傳輸
* 備註:消費者
* 建立人:typ
* 建立時間:2018/9/24 20:07
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
@RabbitListener(queues = "user")
public class UserReceiver {
private static final Logger log = LoggerFactory.getLogger(UserReceiver.class);
@RabbitHandler
public void process(User user) {
log.info("user Receive:" + user.getName() + "," + user.getPass());
}
}
controller測試:
package com.example.demo.rabbitmq.controller;
import com.example.demo.rabbitmq.service.entity.UserSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 路徑:com.example.demo.rabbitmq.controller
* 類名:
* 功能:實體類傳輸測試
* 備註:
* 建立人:typ
* 建立時間:2018/9/24 20:09
* 修改人:
* 修改備註:
* 修改時間:
*/
@RestController
public class RabbitUserTest {
@Autowired
private UserSender userSender;
@PostMapping("/userTest")
public void userTets(){
userSender.send();
}
}
8、topic ExChange示例
topic 是RabbitMQ中最靈活的一種方式,可以根據binding_key自由的繫結不同的佇列。首先對topic規則配置,這裡使用兩個佇列來測試(也就是在Application類中建立和繫結的topic.message和topic.messages兩個佇列),其中topic.message的bindting_key為“topic.message”,topic.messages的binding_key為“topic.#”。
生產者:
package com.example.demo.rabbitmq.service.topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.topic
* 類名:
* 功能:topic ExChange示例------生產者
* 備註:topic 是RabbitMQ中最靈活的一種方式,可以根據binding_key自由的繫結不同的佇列
* 建立人:typ
* 建立時間:2018/9/24 20:12
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class TopicSender {
private static final Logger log = LoggerFactory.getLogger(TopicSender.class);
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msg1 = "I am topic.mesaage msg1!";
log.info("sender1 : " + msg1);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);
String msg2 = "I am topic.mesaages msg2!";
log.info("sender2 : " + msg2);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
}
}
消費者1:
package com.example.demo.rabbitmq.service.topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.topic
* 類名:
* 功能:topic ExChange示例
* 備註:消費者1(topic.message)
* 建立人:typ
* 建立時間:2018/9/24 20:12
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {
private static final Logger log = LoggerFactory.getLogger(TopicReceiver1.class);
@RabbitHandler
public void process(String msg) {
log.info("topicReceiver1: " +msg);
}
}
消費者2:
package com.example.demo.rabbitmq.service.topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.topic
* 類名:
* 功能:topic ExChange示例
* 備註:消費者2(topic.messages)
* 建立人:typ
* 建立時間:2018/9/24 20:12
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
private static final Logger log = LoggerFactory.getLogger(TopicReceiver2.class);
@RabbitHandler
public void process(String msg) {
log.info("topicReceiver2 : " +msg);
}
}
controller測試:
package com.example.demo.rabbitmq.controller;
import com.example.demo.rabbitmq.service.topic.TopicSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 路徑:com.example.demo.rabbitmq.controller
* 類名:
* 功能:topic ExChange示例
* 備註:
* 建立人:typ
* 建立時間:2018/9/24 20:21
* 修改人:
* 修改備註:
* 修改時間:
*/
@RestController
public class RabbitTopicTest {
@Autowired
private TopicSender topicSender;
@PostMapping("/topicTest")
public void topicTest(){
topicSender.send();
}
}
9、fanout ExChange示例
Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout轉發器傳送訊息,綁定了這個轉發器的所有佇列都收到這個訊息。
生產者:
package com.example.demo.rabbitmq.service.fanout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.fanout
* 類名:
* 功能:fanout ExChange示例---生產者
* 備註:Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout轉發器傳送訊息,綁定了這個轉發器的所有佇列都收到這個訊息。
* 建立人:typ
* 建立時間:2018/9/24 21:10
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class FanoutSender {
private static final Logger log = LoggerFactory.getLogger(FanoutSender.class);
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msg="fanoutSender :hello i am fanout";
log.info(msg);
this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msg);
}
}
消費者1:
package com.example.demo.rabbitmq.service.fanout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.fanout
* 類名:
* 功能:fanout ExChange示例
* 備註:消費者A
* 建立人:typ
* 建立時間:2018/9/24 21:10
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
private static final Logger log = LoggerFactory.getLogger(FanoutReceiverA.class);
@RabbitHandler
public void process(String msg) {
log.info("FanoutReceiverA : " + msg);
}
}
消費者2:
package com.example.demo.rabbitmq.service.fanout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.fanout
* 類名:
* 功能:fanout ExChange示例
* 備註:消費者B
* 建立人:typ
* 建立時間:2018/9/24 21:10
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
private static final Logger log = LoggerFactory.getLogger(FanoutReceiverB.class);
@RabbitHandler
public void process(String msg) {
log.info("FanoutReceiverB : " + msg);
}
}
消費者3:
package com.example.demo.rabbitmq.service.fanout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.fanout
* 類名:
* 功能:fanout ExChange示例
* 備註:消費者C
* 建立人:typ
* 建立時間:2018/9/24 21:10
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
private static final Logger log = LoggerFactory.getLogger(FanoutReceiverC.class);
@RabbitHandler
public void process(String msg) {
log.info("FanoutReceiverC : " + msg);
}
}
controller測試:
package com.example.demo.rabbitmq.controller;
import com.example.demo.rabbitmq.service.fanout.FanoutSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 路徑:com.example.demo.rabbitmq.controller
* 類名:
* 功能:fanout ExChange示例
* 備註:
* 建立人:typ
* 建立時間:2018/9/24 22:11
* 修改人:
* 修改備註:
* 修改時間:
*/
@RestController
public class RabbitFanoutTest {
@Autowired
private FanoutSender fanoutSender;
@PostMapping("/fanoutTest")
public void fanoutTest() {
fanoutSender.send();
}
}
10、callback的訊息傳送
增加回調處理,這裡不再使用application.properties預設配置的方式,會在程式中顯示的使用檔案中的配置資訊。
rabbitmq配置類:
package com.example.demo.rabbitmq.service.callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;
/**
* 路徑:com.example.demo.rabbitmq.service.callback
* 類名:
* 功能:增加回調處理,這裡不再使用application.properties預設配置的方式,會在程式中顯示的使用檔案中的配置資訊。
* 備註:
* 建立人:typ
* 建立時間:2018/9/24 20:09
* 修改人:
* 修改備註:
* 修改時間:
*/
public class RabbitConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class);
@Value("${spring.rabbitmq.host}")
private String addresses;
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses+":"+port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
//如果要進行訊息回撥,則這裡必須要設定為true
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
//因為要設定回撥類,所以應是prototype型別,如果是singleton型別,則回撥類為最後一次設定
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplatenew() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
}
生產者:
package com.example.demo.rabbitmq.service.callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* 路徑:com.example.demo.rabbitmq.service.callback
* 類名:CallBackSender
* 功能:callback的訊息傳送-----生產者
* 建立人:typ
* 建立時間:2018/9/24 20:09
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
public class CallBackSender implements RabbitTemplate.ConfirmCallback{
private static final Logger log = LoggerFactory.getLogger(CallBackSender.class);
@Autowired
private RabbitTemplate rabbitTemplatenew;
public void send() {
rabbitTemplatenew.setConfirmCallback(this);
String msg="callbackSender : i am callback sender";
log.info(msg);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("callbackSender UUID: " + correlationData.getId());
this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);
}
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("callbakck confirm: " + correlationData.getId());
}
}
消費者:
package com.example.demo.rabbitmq.service.callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路徑:com.example.demo.rabbitmq.service.callback
* 類名:
* 功能:callback的訊息傳送
* 備註:消費者
* 建立人:typ
* 建立時間:2018/9/24 20:12
* 修改人:
* 修改備註:
* 修改時間:
*/
@Component
@RabbitListener(queues = "topic.messages")
public class CallBackReceiver {
private static final Logger log = LoggerFactory.getLogger(CallBackReceiver.class);
@RabbitHandler
public void process(String msg) {
log.info("CallBackReceiver : " +msg);
}
}
controller測試:
package com.example.demo.rabbitmq.controller;
import com.example.demo.rabbitmq.service.callback.CallBackSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 路徑:com.example.demo.rabbitmq.controller
* 類名:
* 功能:callback的訊息傳送
* 備註:
* 建立人:typ
* 建立時間:2018/9/24 22:20
* 修改人:
* 修改備註:
* 修改時間:
*/
@RestController
public class RabbitCallBackTest {
@Autowired
private CallBackSender callBackSender;
//執行程式碼可以看出callbackSender發出的UUID,收到了迴應,又傳回來了。
@PostMapping("/callback")
public void callbak() {
callBackSender.send();
}
}