SpringBoot使用RabbitMQ訊息佇列
RabbitMQ簡介
AMQP,即Advanced Message Queuing Protocol,高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。AMQP的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。
RabbitMQ基本概念
1.Message
訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。
2.Publisher
訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。
3.Exchange交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。
4.Binding
繫結,用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解成一個由繫結構成的路由表。
5.Queue
訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。
6.Connection
網路連線,比如一個TCP連線。
7.Channel
通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內地虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條 TCP 連線。
8.Consumer
訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。
9.Virtual Host
虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。
10.Broker
表示訊息佇列伺服器實體。
Exchange 型別
Exchange分發訊息時根據型別的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。下面只講前三種模式。
1.Direct模式
訊息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將訊息發到對應的佇列中。路由鍵與佇列名完全匹配
2.Topic模式
topic 交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要繫結到一個模式上。它將路由鍵和繫結鍵的字串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個萬用字元:符號“#”和符號“*”。#匹配0個或多個單詞,*匹配一個單詞。
3.Fanout模式
每個發到 fanout 型別交換器的訊息都會分到所有繫結的佇列上去。fanout 交換器不處理路由鍵,只是簡單的將佇列繫結到交換器上,每個傳送到交換器的訊息都會被轉發到與該交換器繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。fanout 型別轉發訊息是最快的。SpringBoot整合RabbitMQ
在pom.xml
中新增 spring-boot-starter-amqp
的依賴 <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
在 application.yml
檔案中配置rabbitmq
相關內容
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
使用Direct模式
1.配置佇列
package com.lzc.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
static final String QUEUE = "direct_queue";
/**
* Direct模式
* @return
*/
@Bean
public Queue directQueue() {
// 第一個引數是佇列名字, 第二個引數是指是否持久化
return new Queue(QUEUE, true);
}
}
2.建立一個User實體類
package com.lzc.rabbitmq.dataobject;
import lombok.Data;
import java.io.Serializable;
@Data
public class User implements Serializable {
private static final long serialVersionUID = -1262627851741431084L;
private String userId;
private String name;
}
3.接收者
package com.lzc.rabbitmq.config;
import com.lzc.rabbitmq.dataobject.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Receiver {
// queues是指要監聽的佇列的名字
@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void receiverDirectQueue(User user) {
log.info("【receiverDirectQueue監聽到訊息】" + user.toString());
}
}
4.傳送者
package com.lzc.rabbitmq.config;
import com.lzc.rabbitmq.dataobject.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDirectQueue() {
User user = new User();
user.setUserId("123456");
user.setName("lizhencheng");
log.info("【sendDirectQueue已傳送訊息】");
// 第一個引數是指要傳送到哪個佇列裡面, 第二個引數是指要傳送的內容
this.amqpTemplate.convertAndSend(RabbitMQConfig.QUEUE, user);
}
}
5.測試,訪問http://localhost:8080/sendDirectQueue,檢視日誌輸出
package com.lzc.rabbitmq.controller;
import com.lzc.rabbitmq.config.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private Sender sender;
@GetMapping("/sendDirectQueue")
public Object sendDirectQueue() {
sender.sendDirectQueue();
return "ok";
}
}
6.日誌輸出
2018-06-18 01:18:54.901 INFO 8772 --- [io-8080-exec-10] com.lzc.rabbitmq.config.Sender : 【sendDirectQueue已傳送訊息】
2018-06-18 01:18:54.997 INFO 8772 --- [cTaskExecutor-1] com.lzc.rabbitmq.config.Receiver : 【receiverDirectQueue監聽到訊息】User(userId=123456, name=lizhencheng)
注意:傳送者與接收者的Queue名字一定要相同,否則接收收不到訊息
使用Topic模式
1.配置佇列
package com.lzc.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String TOPIC_EXCHANGE = "topic.exchange";
/**
* Topic模式
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("lzc.message");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lzc.#");
}
}
2.建立一個User實體類(和上面一樣)
3.接收者
package com.lzc.rabbitmq.config;
import com.lzc.rabbitmq.dataobject.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Receiver {
// queues是指要監聽的佇列的名字
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE1)
public void receiveTopic1(User user) {
log.info("【receiveTopic1監聽到訊息】" + user.toString());
}
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE2)
public void receiveTopic2(User user) {
log.info("【receiveTopic2監聽到訊息】" + user.toString());
}
}
4.傳送者
package com.lzc.rabbitmq.config;
import com.lzc.rabbitmq.dataobject.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendTopic() {
User user1 = new User();
user1.setUserId("123456");
user1.setName("lizhencheng");
User user2 = new User();
user2.setUserId("456789");
user2.setName("張三");
log.info("【sendTopic已傳送訊息】");
// 第一個引數:TopicExchange名字
// 第二個引數:Route-Key
// 第三個引數:要傳送的內容
this.amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "lzc.message", user1 );
this.amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "lzc.lzc", user2);
}
}
5.測試,訪問http://localhost:8080/sendTopic,檢視日誌輸出
package com.lzc.rabbitmq.controller;
import com.lzc.rabbitmq.config.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private Sender sender;
@GetMapping("/sendTopic")
public Object sendTopic() {
sender.sendTopic();
return "ok";
}
}
6.日誌輸出
2018-06-18 09:48:24.983 INFO 6504 --- [nio-8080-exec-1] com.lzc.rabbitmq.config.Sender : 【sendTopic已傳送訊息】
2018-06-18 09:48:25.048 INFO 6504 --- [cTaskExecutor-1] com.lzc.rabbitmq.config.Receiver : 【receiveTopic2監聽到訊息】User(userId=123456, name=lizhencheng)
2018-06-18 09:48:25.048 INFO 6504 --- [cTaskExecutor-1] com.lzc.rabbitmq.config.Receiver : 【receiveTopic1監聽到訊息】User(userId=123456, name=lizhencheng)
2018-06-18 09:48:25.049 INFO 6504 --- [cTaskExecutor-1] com.lzc.rabbitmq.config.Receiver : 【receiveTopic2監聽到訊息】User(userId=456789, name=張三)
由日誌記錄可以看出lzc.message可以被receiveTopic1和receiveTopic2所接收,而lzc.lzc只能被receiveTopic2接收。
使用Fanout模式
1.配置佇列
package com.lzc.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
/**
* 這裡為了方便就沒有建立新的隊列了,直接使用topic時所建立的佇列。
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2);
}
/**
* Fanout模式
* Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送訊息,綁定了這個交換機的所有佇列都收到這個訊息。
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
}
}
2.建立一個User實體類(和上面一樣)
3.接收者
package com.lzc.rabbitmq.config;
import com.lzc.rabbitmq.dataobject.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Receiver {
// queues是指要監聽的佇列的名字
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE1)
public void receiveTopic1(User user) {
log.info("【receiveTopic1監聽到訊息】" + user.toString());
}
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE2)
public void receiveTopic2(User user) {
log.info("【receiveTopic2監聽到訊息】" + user.toString());
}
}
4.傳送者
package com.lzc.rabbitmq.config;
import com.lzc.rabbitmq.dataobject.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendFanout() {
User user = new User();
user.setUserId("123456");
user.setName("lizhencheng");
log.info("【sendFanout已傳送訊息】");
// 注意, 這裡的第2個引數為空。
// 因為fanout 交換器不處理路由鍵,只是簡單的將佇列繫結到交換器上,
// 每個傳送到交換器的訊息都會被轉發到與該交換器繫結的所有佇列上
this.amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, "", user );
}
}
5.測試,訪問http://localhost:8080/sendFanout,檢視日誌輸出
package com.lzc.rabbitmq.controller;
import com.lzc.rabbitmq.config.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private Sender sender;
@GetMapping("/sendFanout")
public Object sendFanout() {
sender.sendFanout();
return "ok";
}
}
6.日誌輸出
2018-06-18 02:04:47.817 INFO 5848 --- [nio-8080-exec-1] com.lzc.rabbitmq.config.Sender : 【sendFanout已傳送訊息】
2018-06-18 02:04:47.845 INFO 5848 --- [cTaskExecutor-1] com.lzc.rabbitmq.config.Receiver : 【receiveTopic1監聽到訊息】User(userId=123456, name=lizhencheng)
2018-06-18 02:04:47.845 INFO 5848 --- [cTaskExecutor-1] com.lzc.rabbitmq.config.Receiver : 【receiveTopic2監聽到訊息】User(userId=123456, name=lizhencheng)
由日誌輸出可以看出,兩個接收者都接收到了訊息,因為交換機FANOUT_EXCHANGE綁定了兩個佇列。