快取架構之史上講的最明白的RabbitMQ可靠訊息傳輸實戰演練
快取架構之史上講的最明白的RabbitMQ可靠訊息傳輸實戰演練
一、背景介紹:訊息可靠傳遞的重要性
比如:某個廣告主(如:天貓)想在我們的平臺(如:今日頭條)投放廣告,當通過我們的廣告系統新建廣告的時候,該訊息在同步給redis快取(es)的時候丟失了,而我們又沒有發現,造成該廣告無法正常顯示出來,那這損失就打了,如果1天都沒有該廣告的投放記錄,那就有可能是上百萬的損失了,所以訊息的可靠傳輸多我們的廣告系統也是很重要的。 其實,生活中這樣的場景很場景,再比如:交易系統、訂單系統都必須保證訊息的可靠傳輸,否則,損失是巨大的!!!

二、如何保證訊息的可靠傳遞呢?
1、設定交換機、佇列和訊息都為持久化
**持久化:**保證在伺服器重啟的時候可以保持不丟失相關資訊,重點解決伺服器的異常崩潰而導致的訊息丟失問題。但是,將所有的訊息都設定為持久化,會嚴重影響RabbitMQ的效能,寫入硬碟的速度比寫入記憶體的速度慢的不只一點點。對於可靠性不是那麼高的訊息可以不採用持久化處理以提高整體的吞吐率,在選擇是否要將訊息持久化時,需要在可靠性和吞吐量之間做一個權衡。 處於某種應用場景,如:大流量的訂單交易系統,為了不影響效能,我們可以不設定持久化,但是我們會定時掃描資料庫中的未傳送成功的訊息,進行重試傳送,實際應用場景,我們其實有很多解決方案,不要故步自封,換個角度多想想,只有經歷多了,才能應用的更加得心應手。
1)交換機的持久化
@BeanDirectExchangeadvanceExchange(){returnnewDirectExchange(exchangeName);}
註釋:檢視原始碼,易知,預設是持久化的
2)佇列的持久化
@BeanpublicQueueadvanceQueue(){returnnewQueue(queueName);}
註釋:檢視原始碼,易知,預設是持久化的

3)訊息的持久化
當我們使用RabbitTemplate呼叫了 convertAndSend(String exchange, String routingKey, final Object object) 方法。預設就是持久化模式
注意:
持久化的訊息在到達佇列時就被寫入到磁碟,並且如果可以,持久化的訊息也會在記憶體中儲存一份備份,這樣可以提高一定的效能,只有在記憶體吃緊的時候才會從記憶體中清楚。
非持久化的訊息一般只儲存在記憶體中,在記憶體吃緊的時候會被換入到磁碟中,以節省記憶體空間。

2、生產者訊息確認機制
當訊息傳送出去之後,我們如何知道訊息有沒有正確到達exchange呢?如果在這個過程中,訊息丟失了,我們根本不知道發生了什麼,也不知道是什麼原因導致訊息傳送失敗了 為解決這個問題,主要有如下兩種方案:
通過事務機制實現
通過生產者訊息確認機制(publisher confirm)實現
但是使用 事務機制 實現會嚴重降低RabbitMQ的訊息吞吐量,我們採用一種輕量級的方案—— 生產者訊息確認機制
什麼是訊息確認機制 ? 簡而言之,就是:生產者傳送的訊息一旦被投遞到所有匹配的佇列之後,就會發送一個確認訊息給生產者,這就使得生產者知曉訊息已經正確到達了目的地。 如果訊息和佇列是持久化儲存的,那麼確認訊息會在訊息寫入磁碟之後發出。 再補充一個 Mandatory 引數:當Mandatory引數設為true時,如果目的不可達,會發送訊息給生產者,生產者通過一個回撥函式來獲取該資訊。
3、消費者訊息確認機制
為了保證訊息從佇列可靠地到達消費者,RabbitMQ提供了消費者訊息確認機制(message acknowledgement)。採用訊息確認機制之後,消費者就有足夠的時間來處理訊息,不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題,因為RabbitMQ會一直等待並持有訊息,直到消費者確認了該訊息。
4、死信佇列
DLX,Dead Letter Exchange 的縮寫,又死信郵箱、死信交換機。DLX就是一個普通的交換機,和一般的交換機沒有任何區別。 當訊息在一個佇列中變成死信(dead message)時,通過這個交換機將死信傳送到死信佇列中(指定好相關引數,rabbitmq會自動傳送)。
什麼是死信呢?什麼樣的訊息會變成死信呢?
訊息被拒絕(basic.reject或basic.nack)並且requeue=false.
訊息TTL過期
佇列達到最大長度(佇列滿了,無法再新增資料到mq中)
應用場景分析: 在定義業務佇列的時候,可以考慮指定一個死信交換機,並繫結一個死信佇列,當訊息變成死信時,該訊息就會被髮送到該死信佇列上,這樣就方便我們檢視訊息失敗的原因了 **如何使用死信交換機呢?
定義業務(普通)佇列的時候指定引數:
x-dead-letter-exchange: 用來設定死信後傳送的交換機
x-dead-letter-routing-key:用來設定死信的routingKey
@BeanpublicQueuehelloQueue(){//將普通佇列繫結到私信交換機上Map args =newHashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue =newQueue(queueName,true,false,false, args);returnqueue;}
三、實戰演練
專案程式碼下載地址 : ofollow,noindex">https://gitee.com/jikeh/JiKeHCN-RELEASE.git 專案名 :spring-boot-rabbitmq-reliability
1、開啟生產者訊息確認機制
# 開啟發送確認spring.rabbitmq.publisher-confirms=true# 開啟發送失敗退回spring.rabbitmq.publisher-returns=true
2、開啟消費者訊息確認機制
# 開啟ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3、基本配置
@ConfigurationpublicclassRabbitConfig{publicfinalstaticString queueName ="hello_queue";/**
* 死信佇列:
*/publicfinalstaticString deadQueueName ="dead_queue";publicfinalstaticString deadRoutingKey ="dead_routing_key";publicfinalstaticString deadExchangeName ="dead_exchange";/**
* 死信佇列 交換機識別符號
*/publicstaticfinalString DEAD_LETTER_QUEUE_KEY ="x-dead-letter-exchange";/**
* 死信佇列交換機繫結鍵識別符號
*/publicstaticfinalString DEAD_LETTER_ROUTING_KEY ="x-dead-letter-routing-key";@BeanpublicQueuehelloQueue(){//將普通佇列繫結到私信交換機上Map args =newHashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue =newQueue(queueName,true,false,false, args);returnqueue; }/**
* 死信佇列:
*/@BeanpublicQueuedeadQueue(){ Queue queue =newQueue(deadQueueName,true);returnqueue; }@BeanpublicDirectExchangedeadExchange(){returnnewDirectExchange(deadExchangeName); }@BeanpublicBindingbindingDeadExchange(Queue deadQueue, DirectExchange deadExchange){returnBindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); }}
註釋:hell_queue就配置了死信交換機、死信佇列

4、生產者核心程式碼
@ComponentpublicclassHelloSenderimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend(String exchange, String routingKey){ String context ="你好現在是 "+newDate(); System.out.println("send content = "+ context);this.rabbitTemplate.setMandatory(true);this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnCallback(this);this.rabbitTemplate.convertAndSend(exchange, routingKey, context); }/** * 確認後回撥: *@paramcorrelationData *@paramack *@paramcause */@Overridepublicvoidconfirm(CorrelationData correlationData,booleanack, String cause){if(!ack) { System.out.println("send ack fail, cause = "+ cause); }else{ System.out.println("send ack success"); } }/** * 失敗後return回撥: * *@parammessage *@paramreplyCode *@paramreplyText *@paramexchange *@paramroutingKey */@OverridepublicvoidreturnedMessage(Message message,intreplyCode, String replyText, String exchange, String routingKey){ System.out.println("send fail return-message = "+newString(message.getBody()) +", replyCode: "+ replyCode +", replyText: "+ replyText +", exchange: "+ exchange +", routingKey: "+ routingKey); }}
5、消費者核心程式碼
@Component@RabbitListener(queues = RabbitConfig.queueName)publicclassHelloReceiver{@RabbitHandlerpublicvoidprocess(String hello, Channel channel, Message message)throwsIOException{try{ Thread.sleep(2000); System.out.println("睡眠2s"); }catch(InterruptedException e) { e.printStackTrace(); }try{//告訴伺服器收到這條訊息 已經被我消費了 可以在佇列刪掉;否則訊息伺服器以為這條訊息沒處理掉 後續還會在發channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("receiver success = "+ hello); }catch(Exception e) { e.printStackTrace();//丟棄這條訊息channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); System.out.println("receiver fail"); } }}
6、測試生產者訊息確認功能:分為4種場景來測試
//1、exchange, queue 都正確, confirm被回撥, ack=true@RequestMapping("/send1")@ResponseBodypublicStringsend1(){ helloSender.send(null, RabbitConfig.queueName);return"success";}//2、exchange 錯誤, queue 正確, confirm被回撥, ack=false@RequestMapping("/send2")@ResponseBodypublicStringsend2(){ helloSender.send("fail-exchange", RabbitConfig.queueName);return"success";}//3、exchange 正確, queue 錯誤, confirm被回撥, ack=true; return被回撥 replyText:NO_ROUTE@RequestMapping("/send3")@ResponseBodypublicStringsend3(){ helloSender.send(null,"fail-queue");return"success";}//4、exchange 錯誤, queue 錯誤, confirm被回撥, ack=false@RequestMapping("/send4")@ResponseBodypublicStringsend4(){ helloSender.send("fail-exchange","fail-queue");return"success";}
7、測試消費者訊息確認功能
1)當新增這行程式碼的時候: channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 測試結果:訊息被正常消費,訊息從佇列中刪除
2)當註釋掉這行程式碼的時候: channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 測試結果:訊息會被重複消費,一直保留在隊列當中
8、測試死信佇列
當執行這行程式碼的時候: channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); 訊息會被加入到死信佇列中:

四、拓展
除了我們上面講的基本可靠性保證外,其實還有很多效能優化方案、可靠性保證方案:叢集監控、流控、映象佇列、HAProxy+Keeplived高可靠負載均衡 我們後續會繼續分享上述內容,歡迎持續關注…… 下節課,我們將會將該功能應用到快取架構上了
給大家推薦一個程式員學習交流群:863621962。群裡有分享的視訊,還有思維導圖
群公告有視訊,都是乾貨的,你可以下載來看。主要分享分散式架構、高可擴充套件、高效能、高併發、效能優化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分散式專案實戰學習架構師視訊