Springboot整合一之Springboot整合RabbitMQ
1.首先我們簡單瞭解一下訊息中介軟體的應用場景
非同步處理 場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊,傳統的做法有兩種1.序列的方式;2.並行的方式 (1)序列方式:將註冊資訊寫入資料庫後,傳送註冊郵件,再發送註冊簡訊,以上三個任務全部完成後才返回給客戶端。 這有一個問題是,郵件,簡訊並不是必須的,它只是一個通知,而這種做法讓客戶端等待沒有必要等待的東西.
(2)並行方式:將註冊資訊寫入資料庫後,傳送郵件的同時,傳送簡訊,以上三個任務完成後,返回給客戶端,並行的方式能提高處理的時間。
假設三個業務節點分別使用50ms,序列方式使用時間150ms,並行使用時間100ms。雖然並性已經提高的處理時間,但是,前面說過,郵件和簡訊對我正常的使用網站沒有任何影響,客戶端沒有必要等著其傳送完成才顯示註冊成功,英愛是寫入資料庫後就返回. (3)訊息佇列 引入訊息佇列後,把傳送郵件,簡訊不是必須的業務邏輯非同步處理
由此可以看出,引入訊息佇列後,使用者的響應時間就等於寫入資料庫的時間+寫入訊息佇列的時間(可以忽略不計),引入訊息佇列後處理後,響應時間是序列的3倍,是並行的2倍。
應用解耦
場景:雙11是購物狂節,使用者下單後,訂單系統需要通知庫存系統,傳統的做法就是訂單系統呼叫庫存系統的介面.
這種做法有一個缺點:
- 當庫存系統出現故障時,訂單就會失敗。
-
訂單系統和庫存系統高耦合. 引入訊息佇列
訂單系統:使用者下單後,訂單系統完成持久化處理,將訊息寫入訊息佇列,返回使用者訂單下單成功。
庫存系統:訂閱下單的訊息,獲取下單訊息,進行庫操作。 就算庫存系統出現故障,訊息佇列也能保證訊息的可靠投遞,不會導致訊息丟失。 流量削峰 流量削峰一般在秒殺活動中應用廣泛 場景:秒殺活動,一般會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前端加入訊息佇列。 作用: 1.可以控制活動人數,超過此一定閥值的訂單直接丟棄(我為什麼秒殺一次都沒有成功過呢^^) 2.可以緩解短時間的高流量壓垮應用(應用程式按自己的最大處理能力獲取訂單)
1.使用者的請求,伺服器收到之後,首先寫入訊息佇列,加入訊息佇列長度超過最大值,則直接拋棄使用者請求或跳轉到錯誤頁面.
2.秒殺業務根據訊息佇列中的請求資訊,再做後續處理.
以上內容的來源是:https://blog.csdn.net/whoamiyang/article/details/54954780,在此感謝
2.各種訊息中介軟體效能的比較:
TPS比較 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。
持久化訊息比較—zeroMq不支援,activeMq和rabbitMq都支援。持久化訊息主要是指:MQ down或者MQ所在的伺服器down了,訊息不會丟失的機制。
可靠性、靈活的路由、叢集、事務、高可用的佇列、訊息排序、問題追蹤、視覺化管理工具、外掛系統、社群—RabbitMq最好,ActiveMq次之,ZeroMq最差。
高併發—從實現語言來看,RabbitMQ最高,原因是它的實現語言是天生具備高併發高可用的erlang語言。
綜上所述:RabbitMQ的效能相對來說更好更全面,是訊息中介軟體的首選。
3.接下來我們在springboot當中整合使用RabbitMQ
第一步:匯入maven依賴
<!-- rabbitmq依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步:在application.yml檔案中進行RabbitMQ的相關配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest # guest 3.0以後預設只能本地登陸
password: guest
publisher-confirms: true # 訊息傳送到交換機確認機制,是否確認回撥
publisher-returns: true
virtual-host: /
connection-timeout: 6000
server:
port: 8080
注:使用場景:
- 如果訊息沒有到exchange,則confirm回撥,ack=false
- 如果訊息到達exchange,則confirm回撥,ack=true
- exchange到queue成功,則不回撥return
- exchange到queue失敗,則回撥return(需設定mandatory=true,否則不回回調,訊息就丟了)
第二步:進行相關配置
ExchangeConfig 訊息交換機配置
package com.space.rabbitmq.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 訊息交換機配置 可以配置多個
* @author zhuzhe
* @date 2018/5/25 15:40
* @email [email protected]
*/
@Configuration
public class ExchangeConfig {
/**
* 1.定義direct exchange,繫結queueTest
* 2.durable="true" rabbitmq重啟的時候不需要建立新的交換機
* 3.direct交換器相對來說比較簡單,匹配規則為:如果路由鍵匹配,訊息就被投送到相關的佇列
* fanout交換器中沒有路由鍵的概念,他會把訊息傳送到所有繫結在此交換器上面的佇列中。
* topic交換器你採用模糊匹配路由鍵的原則進行轉發訊息到佇列中
* key: queue在該direct-exchange中的key值,當訊息傳送給direct-exchange中指定key為設定值時,
* 訊息將會轉發給queue引數指定的訊息佇列
*/
@Bean
public DirectExchange directExchange(){
DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE,true,false);
return directExchange;
}
}
QueueConfig 佇列配置
package com.space.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 佇列配置 可以配置多個佇列
* @author zhuzhe
* @date 2018/5/25 13:25
* @email [email protected]
*/
@Configuration
public class QueueConfig {
@Bean
public Queue firstQueue() {
/**
durable="true" 持久化 rabbitmq重啟的時候不需要建立新的佇列
auto-delete 表示訊息佇列沒有在使用時將被自動刪除 預設是false
exclusive 表示該訊息佇列是否只在當前connection生效,預設是false
*/
return new Queue("first-queue",true,false,false);
}
@Bean
public Queue secondQueue() {
return new Queue("second-queue",true,false,false);
}
}
RabbitMqConfig RabbitMq配置
package com.space.rabbitmq.config;
import com.space.rabbitmq.mqcallback.MsgSendConfirmCallBack;
import com.space.rabbitmq.mqcallback.MsgSendReturnCallback;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq配置
* @author zhuzhe
* @date 2018/5/25 13:37
* @email [email protected].com
*/
@Configuration
public class RabbitMqConfig {
/** 訊息交換機的名字*/
public static final String EXCHANGE = "exchangeTest";
/*對列名稱*/
public static final String QUEUE_NAME1 = "first-queue";
public static final String QUEUE_NAME2 = "second-queue";
/*
*
* key: queue在該direct-exchange中的key值,當訊息傳送給direct-exchange中指定key為設定值時,
* 訊息將會轉發給queue引數指定的訊息佇列
*/
/** 佇列key1*/
public static final String ROUTINGKEY1 = "queue_one_key1";
/** 佇列key2*/
public static final String ROUTINGKEY2 = "queue_one_key2";
@Autowired
private QueueConfig queueConfig;
@Autowired
private ExchangeConfig exchangeConfig;
/**
* 連線工廠
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
* 將訊息佇列1和交換機進行繫結,指定佇列key1
*/
@Bean
public Binding binding_one() {
return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1);
}
/**
* 將訊息佇列2和交換機進行繫結,指定佇列key2
*/
@Bean
public Binding binding_two() {
return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2);
}
/**
* queue listener 觀察 監聽模式
* 當有訊息到達時會通知監聽在對應的佇列上的監聽物件
* @return
*/
/*@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());
simpleMessageListenerContainer.setExposeListenerChannel(true);
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
simpleMessageListenerContainer.setConcurrentConsumers(1);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
return simpleMessageListenerContainer;
}*/
/**
* 自定義rabbit template用於資料的接收和傳送
* 可以設定訊息確認機制和回撥
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// template.setMessageConverter(); 可以自定義訊息轉換器 預設使用的JDK的,所以訊息物件需要實現Serializable
/**若使用confirm-callback或return-callback,
* 必須要配置publisherConfirms或publisherReturns為true
* 每個rabbitTemplate只能有一個confirm-callback和return-callback
*/
template.setConfirmCallback(msgSendConfirmCallBack());
/**
* 使用return-callback時必須設定mandatory為true,或者在配置中設定mandatory-expression的值為true,
* 可針對每次請求的訊息去確定’mandatory’的boolean值,
* 只能在提供’return -callback’時使用,與mandatory互斥
*/
template.setReturnCallback(msgSendReturnCallback());
template.setMandatory(true);
return template;
}
/* 關於 msgSendConfirmCallBack 和 msgSendReturnCallback 的回撥說明:
1.如果訊息沒有到exchange,則confirm回撥,ack=false
2.如果訊息到達exchange,則confirm回撥,ack=true
3.exchange到queue成功,則不回撥return
4.exchange到queue失敗,則回撥return(需設定mandatory=true,否則不回回調,訊息就丟了)
*/
/**
* 訊息確認機制
* Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些訊息被broker處理,
* 哪些可能因為broker宕掉或者網路失敗的情況而重新發布。
* 確認並且保證訊息被送達,提供了兩種方式:釋出確認和事務。(兩者不可同時使用)
* 在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。
* @return
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack(){
return new MsgSendConfirmCallBack();
}
@Bean
public MsgSendReturnCallback msgSendReturnCallback(){
return new MsgSendReturnCallback();
}
}
訊息回撥
package com.space.rabbitmq.mqcallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* 訊息傳送到交換機確認機制
* @author zhuzhe
* @date 2018/5/25 15:53
* @email [email protected]
*/
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("MsgSendConfirmCallBack , 回撥id:" + correlationData);
if (ack) {
System.out.println("訊息消費成功");
} else {
System.out.println("訊息消費失敗:" + cause+"\n重新發送");
}
}
}
package com.space.rabbitmq.mqcallback;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* @author zhuzhe
* @date 2018/5/25 15:54
* @email [email protected]
*/
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("回饋訊息:"+message);
}
}
生產者/訊息傳送者
package com.space.rabbitmq.sender;
import com.space.rabbitmq.config.RabbitMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* 訊息傳送 生產者1
* @author zhuzhe
* @date 2018/5/25 14:28
* @email [email protected]
*/
@Slf4j
@Component
public class FirstSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 傳送訊息
* @param uuid
* @param message 訊息
*/
public void send(String uuid,Object message) {
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY2,
message, correlationId);
}
}
消費者
package com.space.rabbitmq.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 訊息消費者1
* @author zhuzhe
* @date 2018/5/25 17:32
* @email [email protected]
*/
@Component
public class FirstConsumer {
@RabbitListener(queues = {"first-queue","second-queue"}, containerFactory = "rabbitListenerContainerFactory")
public void handleMessage(String message) throws Exception {
// 處理訊息
System.out.println("FirstConsumer {} handleMessage :"+message);
}
}
測試
package com.space.rabbitmq.controller;
import com.space.rabbitmq.sender.FirstSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @author zhuzhe
* @date 2018/5/25 16:00
* @email [email protected]
*/
@RestController
public class SendController {
@Autowired
private FirstSender firstSender;
@GetMapping("/send")
public String send(String message){
String uuid = UUID.randomUUID().toString();
firstSender.send(uuid,message);
return uuid;
}
}
此時,我們就可以啟動專案,進行測試了