二十一、SpringBoot之RabbitMQ
一、RabbitMQ核心概念
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。
- Message
訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。
- Publisher
訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。
- Exchange
交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。
Exchange有4種類型:direct(預設),fanout, topic, 和headers,不同型別的Exchange轉發訊息的策略有所區別。
- Queue
訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。
- Binding
繫結,用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解成一個由繫結構成的路由表。
Exchange 和Queue的繫結可以是多對多的關係。
- Connection
網路連線,比如一個TCP連線。
- Channel
通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內的虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條TCP連線。
- Consumer
訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。
- Virtual Host
虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定, RabbitMQ 預設的 vhost 是 / 。
- Broker
表示訊息佇列伺服器實體
二、RabbitMQ執行機制
1、AMQP中的訊息路由
AMQP 中訊息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色。生產者把訊息釋出到 Exchange 上,訊息最終到達佇列並被消費者接收,而 Binding 決定交換器的訊息應該傳送到哪個佇列。
2、Exchange 型別
Exchange分發訊息時根據型別的不同分發策略有區別,目前共四種類型: direct、fanout、topic、headers 。headers 匹配 AMQP 訊息的 header 而不是路由鍵, headers 交換器和 direct 交換器完全一致,但效能差很多, 目前幾乎用不到了,所以直接看另外三種類型:
- Direct Exchange(點對點模式):
訊息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將訊息發到對應的佇列中。路由鍵與佇列名完全匹配,如果一個佇列繫結到交換機要求路由鍵為 “dog”,則只轉發 routing key 標記為“dog”的訊息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的模式。
- Fanout Exchange(廣播模式):
每個發到 fanout 型別交換器的訊息都會分到所有繫結的佇列上去。fanout 交換器不處理路由鍵, 只是簡單的將佇列繫結到交換器上,每個傳送到交換器的訊息都會被轉發到與該交換器繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。fanout 型別轉發訊息是最快的。
- Topic Exchange:
topic 交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要繫結到一個模式上。它將路由鍵和繫結鍵的字串切分成單詞,這些單詞之間用點隔開。 它同樣也會識別兩個萬用字元:符號“#”和符號 “*”。#匹配0個或多個單詞,*匹配一個單詞。
三、測試三種類型Exchange
1、登陸RabbitMQ
地址:ip地址:15672
使用者名稱:guest 密碼:guest
2、建立交換器
建立3個交換器,分別為Direct 、Fanout 、Topic 型別。
3、建立佇列
建立四個佇列。
4、將交換器和佇列繫結
每個交換器都把四個佇列繫結上。
5.傳送訊息
- Direct Exchange(點對點模式)傳送訊息:
佇列獲取訊息
- Fanout Exchange(廣播模式)傳送訊息:
佇列獲取資訊
- Topic Exchange傳送訊息:
佇列獲取資訊
四、RabbitMQ整合
1. 引入 spring-boot-starter-amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. application.yml配置
#主機地址
spring.rabbitmq.host=192.168.3.224
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
3. 測試RabbitMQ
- RabbitTemplate:訊息傳送處理元件
bean
public class Book {
private String bookName;
private String author;
config
//自定義MessageConverter:將資料自動轉為josn傳送出去
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
test
@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 傳送訊息
* 如何將資料自動轉為josn傳送出去:自定義messageConverter
* */
@Test
public void contextLoads() {
//Message需要自己構造一個;定義訊息體內容和訊息頭
//rabbitTemplate.send(exchange,routeKey,message);
//1、單播(點對點)
//object預設當成訊息體,只需要傳入要傳送的物件,自動序列化傳送給rabbitmq;
//rabbitTemplate.convertAndSend(exchange,routeKey,object);
Map<String,Object> map = new HashMap<>();
map.put("msg","這是第一個訊息");
map.put("data",Arrays.asList("helloworld",123,true));
//物件被預設序列化以後傳送出去
//rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",map);
rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",new Book("西遊記","吳承恩"));
//2、廣播
rabbitTemplate.convertAndSend("exchange.fanout","",new Book("三國演義","羅貫中"));
}
/**
* 接收訊息
* */
@Test
public void receive() {
Object o = rabbitTemplate.receiveAndConvert("atguigu.news");
System.out.println(o.getClass());
System.out.println(o);
}
}
測試結果
- 監聽訊息佇列的內容
@EnableRabbit //開啟基於註解的RabbitMQ
@SpringBootApplication
public class Springboot02AmqpApplication {
public static void main(String[] args) {
SpringApplication.run(Springboot02AmqpApplication.class, args);
}
}
@Service
public class BookService {
//監聽佇列訊息
@RabbitListener(queues = "atguigu.news")
public void receive(Book book){
System.out.println("收到訊息:"+book);
}
//監聽訊息相關資訊
@RabbitListener(queues = "atguigu.news")
public void receive02(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}
- AmqpAdmin:建立和刪除Queue、Exchange、Binding
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange(){
//建立exchange
//amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
//建立queue
//amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
//建立Binding
amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqpa.haha",null));
System.out.println("建立完成");
}
五、原理
自動配置類RabbitAutoConfiguration.java
- 有自動配置類連線工廠CachingConnectionFactory
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
//有自動配置類連線工廠
@Bean
public CachingConnectionFactory rabbitConnectionFactory(
RabbitProperties properties,
ObjectProvider<ConnectionNameStrategy> connectionNameStrategy)
throws Exception {
PropertyMapper map = PropertyMapper.get();
//配置連線工廠屬性
CachingConnectionFactory factory = new CachingConnectionFactory(
getRabbitConnectionFactoryBean(properties).getObject());
map.from(properties::determineAddresses).to(factory::setAddresses);
map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis)
.to(factory::setChannelCheckoutTimeout);
RabbitProperties.Cache.Connection connection = properties.getCache()
.getConnection();
map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
map.from(connection::getSize).whenNonNull()
.to(factory::setConnectionCacheSize);
map.from(connectionNameStrategy::getIfUnique).whenNonNull()
.to(factory::setConnectionNameStrategy);
return factory;
}
//配置自動配置的連線 配置RabbitProperties裡的屬性
private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(
RabbitProperties properties) throws Exception {
PropertyMapper map = PropertyMapper.get();
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
map.from(properties::determineHost).whenNonNull().to(factory::setHost);
map.from(properties::determinePort).to(factory::setPort);
map.from(properties::determineUsername).whenNonNull()
.to(factory::setUsername);
map.from(properties::determinePassword).whenNonNull()
.to(factory::setPassword);
map.from(properties::determineVirtualHost).whenNonNull()
.to(factory::setVirtualHost);
map.from(properties::getRequestedHeartbeat).whenNonNull()
.asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
RabbitProperties.Ssl ssl = properties.getSsl();
if (ssl.isEnabled()) {
factory.setUseSSL(true);
map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
map.from(ssl::getKeyStore).to(factory::setKeyStore);
map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
map.from(ssl::getTrustStore).to(factory::setTrustStore);
map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
map.from(ssl::isValidateServerCertificate).to((validate) -> factory
.setSkipServerCertificateValidation(!validate));
map.from(ssl::getVerifyHostname).when(Objects::nonNull)
.to(factory::setEnableHostnameVerification);
if (ssl.getVerifyHostname() == null && CAN_ENABLE_HOSTNAME_VERIFICATION) {
factory.setEnableHostnameVerification(true);
}
}
map.from(properties::getConnectionTimeout).whenNonNull()
.asInt(Duration::toMillis).to(factory::setConnectionTimeout);
factory.afterPropertiesSet();
return factory;
}
}
- RabbitProperties:封裝了RabbitMQ的所有配置
//RabbitProperties封裝了RabbitMQ的所有配置
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
- rabbitTemplate:給RabbitMQ傳送和接受訊息的
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = this.messageConverter.getIfUnique();
if (messageConverter != null) {
//如果有自己設定的messageConverter訊息轉換器,也會在這設定進來
template.setMessageConverter(messageConverter);
}
template.setMandatory(determineMandatoryFlag());
RabbitProperties.Template properties = this.properties.getTemplate();
if (properties.getRetry().isEnabled()) {
template.setRetryTemplate(createRetryTemplate(properties.getRetry()));
}
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReplyTimeout);
map.from(properties::getExchange).to(template::setExchange);
map.from(properties::getRoutingKey).to(template::setRoutingKey);
return template;
}
RabbitTemplate.java
//預設使用的是SimpleMessageConverter訊息轉換器
private volatile MessageConverter messageConverter = new SimpleMessageConverter();
//SimpleMessageConverter.java
//預設序列化資料的時候,按照jdk序列化規則來序列化
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[])((byte[])object);
messageProperties.setContentType("application/octet-stream");
} else if (object instanceof String) {
try {
bytes = ((String)object).getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException var6) {
throw new MessageConversionException("failed to convert to Message content", var6);
}
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
}
messageProperties.setContentType("application/x-java-serialized-object");
}
if (bytes != null) {
messageProperties.setContentLength((long)bytes.length);
return new Message(bytes, messageProperties);
} else {
throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
}
- amqpAdmin:RabbitMQ系統管理功能元件,建立和刪除Queue、Exchange、Binding
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}