1. 程式人生 > >基於springboot工程淺談整合rabbitmq怎麼樣防止訊息傳送mq不丟失和消費mq的訊息防止丟失

基於springboot工程淺談整合rabbitmq怎麼樣防止訊息傳送mq不丟失和消費mq的訊息防止丟失

本文只針對springboot整合rabbitmq的訊息防丟失,話不多說,上乾貨....

 

設定傳送mq訊息不丟失實現思路

 

執行的方案:

第一步,要對佇列,訊息以及交換機進行持久化操作(儲存到物理磁碟中)

因為mq訊息預設是儲存在記憶體中

交換機我們在宣告的時候可以進行持久化

 

@Bean(EX_BUYING_ADDPOINTUSER)
public Exchange EX_BUYING_ADDPOINTUSER(){
return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build();
}
解析:
durable(true)表示對當前交換進行持久化

 

佇列持久化

@Bean
public Queue queue(){
return new Queue(ORDER_TACK);
}
解析:
當前new的過程中如果只有一個引數則表示預設的就是已經持久化了
原始碼:
public Queue(String name) {
this(name, true, false, false);
}

 

注意:

訊息持久化,不需要設定的,我們的訊息是儲存在佇列中,佇列如果說是持久化的,那麼我們的訊息就是持久化的。

 

confirm機制
confirm模式需要基於channel進行設定, 一旦某條訊息被投遞到佇列之後,訊息佇列就會發送一個確認資訊給生產者,如果佇列與訊息是可持久化的, 那麼確認訊息會等到訊息成功寫入到磁碟之後發出.
confirm的效能高,主要得益於它是非同步的.生產者在將第一條訊息發出之後等待確認訊息的同時也可以繼續傳送後續的訊息.當確認訊息到達之後,就可以通過回撥方法處理這條確認訊息. 如果MQ服務宕機了,則會返回nack訊息. 生產者同樣在回撥方法中進行後續處理。

 

思路是把要傳送的訊息先放一份到redis中 ,當訊息發到了交換機exchange中完成就回返回ack為true,那麼就完成傳送刪除redis的訊息,否則就從redis取出訊息再次傳送.直到傳送成功....

//增強rabbitmq,代替原來發送訊息的一個類,可以防止丟失資料
@Component
public class ConfirmMessageSender implements RabbitTemplate.ConfirmCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private RedisTemplate redisTemplate;

public static final String MESSAGE_CONFIRM_KEY="message_confirm_";

//有參構造
public ConfirmMessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
//rabbitmq設定提交回調
rabbitTemplate.setConfirmCallback(this);
}
//接收訊息伺服器返回的通知的 ,成功的通知和失敗通知,第二步
/**
*
* @param correlationData 用來保證訊息的唯一
* @param ack 應答 true表示成功的通知,false失敗的通知
* @param cause 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
//成功通知,表示我們資料已經發送成功並且持久化到了磁碟中
//刪除redis中的相關資料
redisTemplate.delete(correlationData.getId());
redisTemplate.delete(MESSAGE_CONFIRM_KEY+correlationData.getId());
}else{
//失敗通知 ,沒有持久化到磁碟中
//從redis中獲取剛才的訊息內容
Map<String,String> map = (Map<String,String>)redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_KEY+correlationData.getId());
//重新發送
String exchange = map.get("exchange");
String routingkey = map.get("routingKey");
String message = map.get("message");
//再次傳送,直到持久化到磁碟為止,每次傳送都帶著唯一標識
rabbitTemplate.convertAndSend(exchange,routingkey, JSON.toJSONString(message),correlationData);
}
}

//自定義訊息傳送方法,第一步,當執行該方法的之後,去呼叫confirm
public void sendMessage(String exchange,String routingKey,String message){
/**
* 重點是 CorrelationData 物件,每個傳送的訊息都需要配備一個 CorrelationData 相關資料物件,CorrelationData 物件內部只有一個 id 屬性,用來表示當前訊息唯一性。
*/
//設定訊息的唯一標識並存入到redis中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//把訊息快取到redis中 ,目的是備份
redisTemplate.opsForValue().set(correlationData.getId(),message);

//將本次傳送訊息的相關元資料儲存到redis中
Map<String,String> map = new HashMap<>();
map.put("exchange",exchange);
map.put("routingKey",routingKey);
map.put("message",message);
//把元資料快取到redis中,保證訊息的唯一
redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_KEY+correlationData.getId(),map);

//攜帶著本次訊息的唯一標識,進行資料傳送
rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);

}
}

 

獲取獲取mq中的訊息不丟失?

 

在application.yml中設定手動應答:

rabbitmq:
host: 192.168.200.128
listener:
simple:
acknowledge-mode: manual #手動

 

在監聽類中設定手動應答:

簡單來說就是當業務邏輯處理沒問題就執行channel.basicAck的方法,來返回消費完成,如果出現問題了 就執行channel.basicNack的方法,訊息會回到原有的佇列,重新的傳送,一直到訊息傳送業務邏輯執行成功

@Component
public class ConsumeListener {
@RabbitListener(queues = RabbitMQConfig.SECKILL_ORDER_KEY)
public void receiveSecKillOrderMessage(Channel channel, Message message){
try {

.....邏輯處理........

 


channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
//返回失敗通知
//第一個boolean true所有消費者都會拒絕這個訊息,false代表只有當前消費者拒絕
//第二個boolean false當前訊息會進入到死信佇列,true重新回到原有佇列中,預設回到頭部
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
} catch (IOException e) {
e.printStackTrace();
}

}

}

 

流量削峰:保證在同一時間內流量進行削弱,然後放行過來

在監聽類中設定:

channel.basicQos(300);

這個300是官方給出的值,代表每次在mq中抓取300個訊息消費,

太大會影響伺服器的效能,太小就回浪費

 

希望對大家有幫助...

 

&n