SpringBoot整合RabbitMQ之Topic Exchange萬用字元交換機(學習總結)
一、簡介
Topic Exchange交換機也叫萬用字元交換機,我們在傳送訊息到Topic Exchange的時候不能隨意指定route key(應該是由一系列點號連線的字串,一般會與binding key有關聯,route key的長度一般不能超過255個位元組)。同理,交換機與佇列之間的binding key也應該是點號連線成的字串,當訊息傳送者傳送資訊到Topic Exchage交換機的時候,這時候傳送訊息的route key會與binding key進行萬用字元匹配,所有匹配成功的訊息都會發送到訊息接受者。
Topic Exchange主要有兩種萬用字元:# 和 *
- *(星號):可以(只能)匹配一個單詞
- #(井號):可以匹配多個單詞(或者零個)
下面我們根據一張圖來理解一下Topic Exchange是怎麼匹配的:
【a】一條以” com.register.mail”為routing key的訊息將會匹配到Register Queue與SaveMail Queue兩個佇列上,所以訊息會發送到訊息接收者1和訊息接收者2。routing key為“email.register.test”的訊息同樣會被推送到Register Queue與SaveMail Queue兩個佇列。
【b】如果routing key為”com.register.wsh”的話,訊息只會被推送到Register Queue上;routing key為”email.com.wsh
注意:如果在傳送訊息的時候沒有匹配到符合條件的binding key,那麼這條訊息將會被廢棄。如:com.register.wsh.test 訊息不會被推送到Register Queue上,但是注意 email.com.wsh.test則可以推送到SaveMail Queue上。
- 當一個佇列被繫結為binding key為”#”時,它將會接收所有的訊息,這類似於廣播形式的交換機模式。
- 當binding key不包含”*”和”#”時,這類似於我們上一章說的Direct Exchange直連交換機模式。
二、準備工程
基於上一章的學習,我們已經對RabbitMQ有了一定的認識,這一章有些概念就不做詳細講述。這裡我們新建一個springboot工程:具體專案結構:
springboot_rabbitmq_topic_exchange : 埠1111
本章主要實現了會員註冊之後,傳送通知,然後訊息接收者接收到訊息之後進行儲存會員操作以及傳送註冊成功郵件案例。
需要注意引入amqp的RabbitMQ依賴,具體pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.springboot.wsh</groupId>
<artifactId>springboot_rabbitmq_topic_exchange</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot_rabbitmq_topic_exchange</name>
<description>RabbitMQ Topic Exchange萬用字元交換機</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- JavaMail依賴 -->
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>1.4.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置檔案:配置RabbitMQ的一些訊息以及資料庫連線資訊等
server:
port: 1111
spring:
application:
name: rabbitmq_topic_exchange
rabbitmq:
host: localhost
virtual-host: /
username: guest
password: guest
publisher-confirms: true
port: 5672
datasource:
username: root
password: wsh0905
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/rabbitmq?characterEncoding=utf8
jpa:
database: MySQL
show-sql: true
hibernate:
naming_strategy: org.hibernate.cfg.ImprovedNamingStrategy
三、資料庫相關操作類
這裡我們採用Spring Data JPA方式操作資料庫,我們先建一張member表,建表語句:
CREATE TABLE `member` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`username` varchar(20) DEFAULT NULL,
`password` varchar(20) DEFAULT NULL,
`email` varchar(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8
接著建立實體類Member:
/**
* @Title: Member
* @ProjectName springboot_rabbitmq_topic_exchange
* @Description: 會員實體類
* @Author WeiShiHuai
* @Date 2018/9/21 14:53
*/
@Entity
@Table(name = "member")
public class Member implements Serializable {
@Id
@Column(name = "id")
@GeneratedValue
private Long id;
/**
* 使用者名稱
*/
@Column(name = "username")
private String username;
/**
* 密碼
*/
@Column(name = "password")
private String password;
/**
* 郵箱
*/
@Column(name = "email")
private String email;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
}
MemberRepository: 注意是一個介面,繼承了JpaRepository,預設實現了增刪查改功能
public interface MemberRepository extends JpaRepository<Member,Long>{
}
MemberController:
@RestController
public class MemberController {
@Autowired
private MemberService memberService;
@RequestMapping("/registerMember")
public void registerMember() throws Exception {
Member member = new Member();
member.setUsername("weixiaohuai");
member.setPassword("123456");
member.setEmail("[email protected]");
memberService.memberRegister(member);
}
}
MemberService:
@Service
public class MemberService {
@Autowired
private MemberRegisterSender memberRegisterSender;
public Long memberRegister(Member member) throws Exception {
//會員註冊
memberRegisterSender.sendMessage(member);
return member.getId();
}
}
由於這些類相對簡單一點,就不多說了。
四、新建常量類Constants.java
這個類主要存放RabbitMQ 佇列名稱、交換機、以及兩者之間的route key。
/**
* @Title: Constants
* @ProjectName springboot_rabbitmq_topic_exchange
* @Description: 常量類
* @Author WeiShiHuai
* @Date 2018/9/21 15:00
*/
public class Constants {
/**
* 訊息佇列-topic交換機名稱
*/
public static final String MEMBER_TOPIC_EXCHANGE_NAME = "rabbit_mq_topic_exchange_name";
/**
* 訊息佇列-註冊會員-佇列名稱
*/
public static final String MEMBER_REGISTER_QUEUE_NAME = "rabbit_mq_member_register_queue_name";
/**
* 訊息佇列-註冊會員-佇列路由鍵
*/
public static final String MEMBER_REGISTER_QUEUE_ROUTE_KEY = "register.*";
/**
* 訊息佇列-傳送郵件-佇列名稱
*/
public static final String MEMBER_SEND_MAIL_QUEUE_NAME = "rabbit_mq_member_send_mail_queue_name";
/**
* 訊息佇列-傳送郵件-佇列路由鍵
*/
public static final String MEMBER_SEND_MAIL_QUEUE_ROUTE_KEY = "register.#";
/**
* 訊息佇列-topic交換機-路由key
*/
public static final String MEMBER_TOPIC_EXCHANGE_ROUTE_KEY = "register.member";
/**
* 郵件文字型別 - HTML
*/
public static final String SEND_MAIL_HTML_TYPE = "text/html;charset=UTF-8";
/**
* 郵件文字型別 - TEXT
*/
public static final String SEND_MAIL_TEXT_TYPE = "text";
}
五、新建RabbitMQ相關配置類
這個類主要是建立了一個Topic Exchange萬用字元 交換機,一個註冊會員佇列以及與交換機的繫結操作、一個傳送郵件佇列以及與交換機進行繫結。並加入了日誌記錄各個物件例項的建立狀態。
/**
* @Title: MemberRegistrtRabbitMQConfiguration
* @ProjectName springboot_rabbitmq_topic_exchange
* @Description: 會員註冊-RabitMQ-相關配置類
* @Author WeiShiHuai
* @Date 2018/9/21 15:10
*/
@Configuration
public class MemberRegistrtRabbitMQConfiguration {
private static Logger logger = LoggerFactory.getLogger(MemberRegistrtRabbitMQConfiguration.class);
/**
* 建立萬用字元交換機例項
*
* @return 萬用字元交換機例項
*/
@Bean
public TopicExchange topicExchange() {
TopicExchange topicExchange = new TopicExchange(Constants.MEMBER_TOPIC_EXCHANGE_NAME);
logger.info("【【【會員註冊萬用字元交換機例項建立成功】】】");
return topicExchange;
}
/**
* 建立會員註冊佇列例項,並持久化
*
* @return 會員註冊佇列例項
*/
@Bean
public Queue memberRegisterQueue() {
Queue memberRegisterQueue = new Queue(Constants.MEMBER_REGISTER_QUEUE_NAME, true);
logger.info("【【【會員註冊佇列例項建立成功】】】");
return memberRegisterQueue;
}
/**
* 建立會員傳送郵件佇列例項,並持久化
*
* @return 會員傳送郵件佇列例項
*/
@Bean
public Queue memberSendMailQueue() {
Queue memberRegisterQueue = new Queue(Constants.MEMBER_SEND_MAIL_QUEUE_NAME, true);
logger.info("【【【會員傳送郵件佇列例項建立成功】】】");
return memberRegisterQueue;
}
/**
* 繫結會員註冊佇列到交換機
*
* @return 繫結物件
*/
@Bean
public Binding memberRegisterBinding() {
Binding binding = BindingBuilder.bind(memberRegisterQueue()).to(topicExchange()).with(Constants.MEMBER_REGISTER_QUEUE_ROUTE_KEY);
logger.info("【【【會員註冊佇列與交換機繫結成功】】】");
return binding;
}
/**
* 繫結會員傳送郵件佇列到交換機
*
* @return 繫結物件
*/
@Bean
public Binding memberSendMailBinding() {
Binding binding = BindingBuilder.bind(memberSendMailQueue()).to(topicExchange()).with(Constants.MEMBER_SEND_MAIL_QUEUE_ROUTE_KEY);
logger.info("【【【會員傳送郵件佇列與交換機繫結成功】】】");
return binding;
}
}
六、新建訊息傳送者
主要是注入了RabbitTemplate,使用RabbitTemplate提供的convertAndSend方法進行訊息的傳送,傳入定義好的交換機、route-key以及傳送的訊息物件。
/**
* @Title: MemberRegisterSender
* @ProjectName springboot_rabbitmq_topic_exchange
* @Description: 使用者註冊訊息傳送者
* @Author WeiShiHuai
* @Date 2018/9/21 15:33
*/
@Component
public class MemberRegisterSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 傳送會員註冊通知訊息
*
* @param message 訊息內容
*/
public void sendMessage(Member message) throws Exception {
rabbitTemplate.convertAndSend(Constants.MEMBER_TOPIC_EXCHANGE_NAME, Constants.MEMBER_TOPIC_EXCHANGE_ROUTE_KEY, message);
}
}
七、新建訊息接收者MemberRegisterReceiver
/**
* @Title: MemberRegisterReceiver
* @ProjectName springboot_rabbitmq_topic_exchange
* @Description: 使用者註冊訊息接收者
* @Author WeiShiHuai
* @Date 2018/9/21 15:47
*/
@Component
@RabbitListener(queues = "rabbit_mq_member_register_queue_name")
public class MemberRegisterReceiver {
@Autowired
private MemberRepository memberRepository;
private static Logger logger = LoggerFactory.getLogger(MemberRegisterReceiver.class);
@RabbitHandler
@Transactional
public void handler(Member member) throws Exception {
logger.info("會員使用者名稱: {}, 註冊成功, 準備建立會員資訊...", member.getUsername());
//儲存會員訊息
memberRepository.save(member);
}
}
可以看到,我們使用@RabbitListener監聽了佇列名稱為rabbit_mq_member_register_queue_name的佇列,這個名稱需要對應Constants.MEMBER_REGISTER_QUEUE_NAME,否則訊息將不會成功接收。 我們使用@RabbitHandler註解進行訊息的處理,注入了MemberRepository進行儲存會員操作。
八、新建訊息接收者MemberSendMailReceiver
/**
* @Title: MemberRegisterReceiver
* @ProjectName springboot_rabbitmq_topic_exchange
* @Description: 傳送郵件訊息接收者
* @Author WeiShiHuai
* @Date 2018/9/21 15:47
*/
@Component
@RabbitListener(queues = "rabbit_mq_member_send_mail_queue_name")
public class MemberSendMailReceiver {
private static Logger logger = LoggerFactory.getLogger(MemberSendMailReceiver.class);
@Transactional
@RabbitHandler
public void sendMail(Member member) throws Exception {
logger.info("會員使用者名稱:{},註冊成功,準備傳送郵件...", member.getUsername());
//執行傳送郵件操作
new EMailSender()
.setTitle("會員註冊成功通知郵件")
.setContent("恭喜你,你已註冊成為我們的會員")
.setContentType(Constants.SEND_MAIL_TEXT_TYPE)
.setSendMailTargets(new ArrayList<String>() {{
add("[email protected]");
}}).send();
}
}
同理注意監聽的佇列名稱需要與建立佇列時的名稱一致等。從上圖看到,我們呼叫了傳送郵件的方法,這裡採用Java Mail實現傳送郵件的簡單功能,主要是演示RabbitMQ, 點到為止即可。下面直接貼一下相關程式碼:
【a】郵件傳送者物件MailEntity.java
/**
* @Title: MailEntity
* @ProjectName springboot_rabbitmq_topic_exchange
* @Description: 傳送郵件時需要的引數欄位
* @Author WeiShiHuai
* @Date 2018/9/21 22:53
*/
public class MailEntity implements Serializable {
/**
* SMTP伺服器
*/
private String smtpService;
/**
* 埠號
*/
private String smtpPort;
/**
* 傳送郵箱
*/
private String fromMailAddress;
/**
* 傳送郵箱的STMP口令
*/
private String fromMailStmpPwd;
/**
* 郵件標題
*/
private String title;
/**
* 郵件內容
*/
private String content;
/**
* 內容格式(預設採用html)
*/
private String contentType;
/**
* 接受郵件地址集合
*/
private List<String> list = new ArrayList<>();
public String getSmtpService() {
return smtpService;
}
public void setSmtpService(String smtpService) {
this.smtpService = smtpService;
}
public String getSmtpPort() {
return smtpPort;
}
public void setSmtpPort(String smtpPort) {
this.smtpPort = smtpPort;
}
public String getFromMailAddress() {
return fromMailAddress;
}
public void setFromMailAddress(String fromMailAddress) {
this.fromMailAddress = fromMailAddress;
}
public String getFromMailStmpPwd() {
return fromMailStmpPwd;
}
public void setFromMailStmpPwd(String fromMailStmpPwd) {
this.fromMailStmpPwd = fromMailStmpPwd;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getContentType() {
return contentType;
}
public void setContentType(String contentType) {
this.contentType = contentType;
}
public List<String> getList() {
return list;
}
public void setList(List<String> list) {
this.list = list;
}
}
【b】郵件配置檔案以及讀取配置檔案工具類
mail.properties:位於resources目錄下,主要配置授權碼以及郵件傳送方地址,顯示暱稱等資訊
send.mail.smtp.service=smtp.qq.com
send.mail.smtp.prot=587
#郵件傳送方地址
[email protected]
#郵件傳送方授權碼
send.mail.from.smtp.pwd=對應你qq郵箱的授權碼
#郵件傳送的時候顯示的暱稱
send.mail.from.nickname=weixiaohuai
PropertiesUtils:
/**
* @Title: PropertiesUtils
* @ProjectName springboot_rabbitmq_topic_exchange
* @Description: 工具類
* @Author WeiShiHuai
* @Date 2018/9/21 22:54
*/
public class PropertiesUtils {
private final ResourceBundle resource;
private final String fileName;
/**
* 建構函式例項化部分物件,獲取檔案資源物件
*
* @param fileName
*/
public PropertiesUtils(String fileName) {
this.fileName = fileName;
Locale locale = new Locale("zh", "CN");
this.resource = ResourceBundle.getBundle(this.fileName, locale);
}
/**
* 根據傳入的key獲取物件的值
*
* @param key properties檔案對應的key
* @return String 解析後的對應key的值
*/
public String getValue(String key) {
return this.resource.getString(key);
}
/**
* 獲取properties檔案內的所有key值
*
* @return
*/
public Enumeration<String> getKeys() {
return resource.getKeys();
}
}
【c】郵件傳送邏輯EmailSender.java
package com.springboot.wsh.mail;
import com.springboot.wsh.constants.Constants;
import com.springboot.wsh.entity.MailEntity;
import com.springboot.wsh.utils.PropertiesUtils;
import org.apache.commons.lang.StringUtils;
import javax.mail.*;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeUtility;
import java.util.List;
import java.util.Properties;
/**
* @Title: EMailSender
* @ProjectName springboot_rabbitmq_topic_exchange
* @Description: 郵件傳送者實體,主要作用就是用來配置傳送郵件引數以及執行傳送郵件操作
* @Author WeiShiHuai
* @Date 2018/9/21 22:27
*/
public class EMailSender {
/**
* 郵件實體
*/
private static MailEntity mail = new MailEntity();
/**
* 設定郵件標題
*
* @param title 標題資訊
* @return
*/
public EMailSender setTitle(String title) {
mail.setTitle(title);
return this;
}
/**
* 設定郵件內容
*
* @param content
* @return
*/
public EMailSender setContent(String content) {
mail.setContent(content);
return this;
}
/**
* 設定郵件格式
*
* @param typeEnum
* @return
*/
public EMailSender setContentType(String typeEnum) {
mail.setContentType(typeEnum);
return this;
}
/**
* 設定請求目標郵件地址
*
* @param targets
* @return
*/
public EMailSender setSendMailTargets(List<String> targets) {
mail.setList(targets);
return this;
}
/**
* 執行傳送郵件
*
* @throws Exception 如果傳送失敗會丟擲異常資訊
*/
public void send() throws Exception {
//校驗傳送郵件物件引數是否設定
this.checkMailParams(mail);
//讀取/resource/mail.properties檔案內容
final PropertiesUtils properties = new PropertiesUtils("mail");
// 建立Properties 類用於記錄郵箱的一些屬性
final Properties props = new Properties();
// 表示SMTP傳送郵件,進行身份驗證
props.put("mail.smtp.auth", "true");
//此處填寫SMTP伺服器
props.put("mail.smtp.host", properties.getValue("send.mail.smtp.service"));
//設定埠號,QQ郵箱兩個埠465/587
props.put("mail.smtp.port", properties.getValue("send.mail.smtp.prot"));
// 設定傳送郵箱
props.put("mail.user", properties.getValue("send.mail.from.address"));
// 設定授權碼
props.put("mail.password", properties.getValue("send.mail.from.smtp.pwd"));
// 構建授權資訊,用於進行SMTP進行身份驗證
Authenticator authenticator = new Authenticator() {
protected PasswordAuthentication getPasswordAuthentication() {
// 使用者名稱、密碼
String userName = props.getProperty("mail.user");
String password = props.getProperty("mail.password");
return new PasswordAuthentication(userName, password);
}
};
// 使用環境屬性和授權資訊,建立郵件會話
Session mailSession = Session.getInstance(props, authenticator);
// 建立郵件訊息
MimeMessage message = new MimeMessage(mailSession);
// 設定發件人
String nickName = MimeUtility.encodeText(properties.getValue("send.mail.from.nickname"));
InternetAddress form = new InternetAddress(nickName + " <" + props.getProperty("mail.user") + ">");
message.setFrom(form);
// 設定郵件標題
message.setSubject(mail.getTitle());
//html傳送郵件
if (mail.getContentType().equals(Constants.SEND_MAIL_HTML_TYPE)) {
// 設定郵件的內容體 預設使用html方式傳送
message.setContent(mail.getContent(), StringUtils.isBlank(mail.getContentType()) ? Constants.SEND_MAIL_HTML_TYPE : mail.getContentType());
} else if (mail.getContentType().equals(Constants.SEND_MAIL_TEXT_TYPE)) {
// Text文字方式傳送
message.setText(mail.getContent());
}
//傳送郵箱地址
List<String> targets = mail.getList();
for (String target : targets) {
try {
// 設定收件人的郵箱
InternetAddress to = new InternetAddress(target);
message.setRecipient(Message.RecipientType.TO, to);
// 傳送郵件
Transport.send(message);
} catch (Exception e) {
continue;
}
}
}
/**
* 校驗傳送郵件的一些引數是否設定
*
* @param mail 郵件傳送物件
* @throws Exception
*/
private void checkMailParams(MailEntity mail) throws Exception {
if (StringUtils.isBlank(mail.getTitle())) {
throw new Exception("抱歉,郵件標題不能為空,請先設定郵件標題!");
}
if (StringUtils.isBlank(mail.getContent())) {
throw new Exception("抱歉,郵件內容不能為空,請先設定郵件內容!");
}
if (mail.getList().size() == 0) {
throw new Exception("抱歉,郵件接收方不能為空,請先設定郵件接收目標物件!");
}
}
}
【d】登入郵件傳送方qq郵件,對應你配置檔案上寫的郵箱地址,開啟POP3/SMTP服務:
【e】生成授權碼
這樣郵件相關的設定就已經完成了,下面我們進行測試一下。
九、啟動專案
可以看到我們RabbitMQ已經啟動成功並且交換機、兩個佇列也成功繫結,訪問http://localhost:1111/registerMember,因為我們再兩個佇列分別進行了儲存資料庫操作以及傳送郵件操作,
可以看到,訊息接受者已經成功接收到訊息傳送者的訊息,並且進行了相應的處理。現在查詢資料庫中是否有對應的會員資訊:
可以看到,成功執行了儲存會員的操作。
下面我們登入郵件接收方[email protected],
可以看到成功接收到了郵件傳送方傳送的註冊成功通知郵件。
十、總結
Topic Exchange是一個通過萬用字元匹配的交換機模式,如果route key與繫結的key不一致的話,訊息會被丟棄,這樣訊息接收者也不能接收到傳送者傳送的訊息。本文是在作者我學習Topic Exchange的時候的一些總結以及實踐,能力有限,僅供大家參考學習,共同學習,共同進步!