高併發的核心技術 - 訊息中介軟體(MQ)
MQ簡介
-
什麼是MQ
跨程序的訊息佇列,主要角色包括生產者與消費者。
生產者只負責生產資訊,無法感知消費者是誰,訊息怎麼處理,處理結果是什麼。
消費者負責接收及處理訊息,無法感知生產者是誰,怎麼產生的。
-
Mq能做什麼?
MQ 特性一般有非同步,吞吐量大 ,延時低;
適合做:
- 投遞非同步通知。
- 限流,削峰谷。
- 可靠事件,處理資料一致性。
- 利用一些特性,可以做定時任務。
等….
由於MQ是非同步處理訊息的,所以MQ不適合做同步處理操作,如果需要及時的返回處理結果請不要用MQ;
-
MQ 個系統帶來了什麼?
缺點:增加了系統的複雜性,除了程式碼元件接入以外還需要考慮,高可用,叢集,訊息的可靠性等問題!
生產者:訊息傳送怎麼保證可靠性,怎麼保證不重複!
消費者:怎麼保證冪等性,接收到重複訊息怎麼處理!
還有會帶來的處理延時等問題!
優點: 解耦,利用MQ我們可以很好的給我們系統解耦,特別是分散式/微服系統!
原來的同步操作,可以用非同步處理,也可以帶來更快的響應速度;
- 哪些場景可以使用MQ
場景 (1)
系統解耦,使用者系統或者其他系統需要傳送簡訊可以通過 MQ 執行;很好的將 使用者系統 和 簡訊系統進行解耦;

場景(2)
順序執行的任務場景,假設 A B C 三個任務,B需要等待 A完成才去執行,C需要等待B完成才去執行;
我見過一些同學的做法是 ,用 三個定時器 錯開時間去執行的,假設 A定時器 9 點執行, B 定時器 10 點執行 , C 11 點執行 , 類似這樣子;
這樣做其實是 不安全的, 因為 後一個任務 無法知道 前一個任務是否 真的執行了! 假設 A 宕機了, 到 10 點 B 定時去 執行,這時候 資料就會產生異常!
當我們 引入 MQ 後 可以這麼做, A執行完了 傳送 訊息給 B ,B收到訊息後 執行,C 類似,收到 B訊息後執行;
場景(3)
支付閘道器的通知,我們的系統常常需要接入支付功能,微信或者支付寶通常會以回撥的形式通知我們系統支付結果。
我們可以將我們的支付閘道器獨立出來,通過MQ通知我們業務系統進行處理,這樣處理有利於系統的解耦和擴充套件!
假設我們還有一個積分系統,使用者支付成功,給使用者新增積分。只需要積分系統監聽這個訊息,並處理積分就好,無需去修改再去修改閘道器層程式碼!
如果沒有使用MQ ,我是不是還得去修改網關係統的程式碼,遠端呼叫增加積分的介面?
這就是使用了MQ的好處,解耦和擴充套件!
當然我們的轉發規則也要保證每個感興趣的佇列能獲取到訊息!

場景(4)
微服/分散式系統,分散式事務 - 最終一致性 處理方案!
場景(5)
- 訊息延時佇列,可做些定時任務,不固定時間執行的定時任務。
- 例如:使用者下單後如果24小時未支付訂單取消;
- 確認收貨後2天后沒有評價自動好評;
等...
我們以前的做法是 通常啟用一個定時器,每分鐘或者每小時,去跑一次取出需要處理的訂單或其他資料進行處理。
這種做法一個是 效率比較低,如果資料量大的話,每次都要掃庫,非常要命!
再者時效性不是很高,最差的時候可能需要等待一輪時長!
還有可能出現重複執行的結果,時效和輪詢的頻率難以平衡!
利用MQ(Rabbitmq),DLX (Dead Letter Exchanges)和 訊息的 TTL (Time-To-Live Extensions)特性。我們可以高效的完成這個任務場景!不需要掃庫,時效性更好!
DLX: http://www.rabbitmq.com/dlx.html,
TTL: http://www.rabbitmq.com/ttl.html#per-message-ttl
原理:
傳送到佇列的訊息,可以設定一個存活時間 TTL,在存活時間內沒有被消費,可以設定這個訊息轉發到其他佇列裡面去;然後我們從這個其他佇列裡面消費執行我們的任務,這樣就可以達到一個訊息延時的效果!

設定過期時間:
過期時間可以統一設定到訊息佇列裡面,也可以單獨設定到某個訊息!
PS 如果訊息設定了過期時間,發生到了設定有過期時間的佇列,已佇列設定的過期時間為準!
已 SpringBoot 為例:
配置轉發佇列和被轉發佇列:
@Component @Configuration public class RabbitMqConfig { @Bean public Queue curQueue() { Map<String, Object> args = new HashMap<String, Object>(); //超時後的轉發器 過期轉發到 delay_queue_exchange args.put("x-dead-letter-exchange", "delay_queue_exchange"); //routingKey 轉發規則 args.put("x-dead-letter-routing-key", "user.#"); //過期時間 20 秒 args.put("x-message-ttl", 20000); return new Queue("cur_queue", false, false, false, args); } @Bean public Queue delayQueue() { return new Queue("delay_queue"); } @Bean TopicExchange exchange() { //當前佇列 return new TopicExchange("cur_queue_exchange"); } @Bean TopicExchange exchange2() { //被轉發的佇列 return new TopicExchange("delay_queue_exchange"); } @Bean Binding bindingHelloQueue(Queue curQueue, TopicExchange exchange) { //繫結佇列到轉發器 return BindingBuilder.bind(curQueue).to(exchange).with("user.#"); } @Bean Binding bindingHelloQueue2(Queue delayQueue, TopicExchange exchange2) { return BindingBuilder.bind(delayQueue).to(exchange2).with("user.#"); } }
發生訊息:
@Component public class MqEventSender { Logger logger = LoggerFactory.getLogger(MqEventSender.class); @Autowired private RabbitTemplate rabbitTemplate; /** * 訊息沒有設定 時間 *發生到佇列 cur_queue_exchange * @param msg */ public void sendMsg(String msg) { logger.info("傳送訊息: " + msg); rabbitTemplate.convertAndSend("cur_queue_exchange", "user.ss", msg); } /** * 訊息設定時間 *發生到佇列 cur_queue_exchange * @param msg */ public void sendMsgWithTime(String msg) { logger.info("傳送訊息: " + msg); MessageProperties messageProperties = new MessageProperties(); //過期時間設定 10 秒 messageProperties.setExpiration("10000"); Message message = rabbitTemplate.getMessageConverter().toMessage(msg, messageProperties); rabbitTemplate.convertAndSend("cur_queue_exchange", "user.ss", message); } }
訊息監聽:
監聽 的佇列是 delay_queue 而不是 cur_queue;
PS cur_queue 不應該有監聽者,否則訊息被消費達不到想要的延時訊息效果!
/** * Created by linli on 2017/8/21. * 監聽 被丟到 超時佇列內容 */ @Component @RabbitListener(queues = "delay_queue") public class DelayQueueListener { public static Logger logger = LoggerFactory.getLogger(AddCommentsEventListener.class); @RabbitHandler public void process(@Payload String msg) { logger.info("收到訊息 "+msg); } }
測試:
/** * Created by linli on 2017/8/21. */ @RestController @RequestMapping("/test") public class TestContorller { @Autowired MqEventSender sender; @RequestMapping("/mq/delay") public String test() { sender.sendMsg("佇列延時訊息!"); sender.sendMsgWithTime("訊息延時訊息!"); return ""; } }
結果:

觀察結果發現:傳送時間 和 收到時間 間隔 20秒 ;
我們給訊息設定的 10 秒 TTL 時間沒有生效!驗證了 : 如果訊息設定了過期時間,發生到了設定有過期時間的佇列,已佇列設定的過期時間為準!
如果希望每個訊息都要自己的存活時間,傳送到佇列 不要設定
args.put(“x-message-ttl”, 20000);
訊息的過期時間 設定在佇列還是訊息,根據自己的業務場景去定!
- 總結
MQ 是一個跨程序的訊息佇列,我們可以很好的利用他進行系統的解耦;
引入MQ會給系統帶來一定的複雜度,需要評估!
MQ 適合做非同步任務,不適合做同步任務!
本文的重點是你有沒有收穫與成長,其餘的都不重要,希望讀者們能謹記這一點。同時我經過多年的收藏目前也算收集到了一套完整的學習資料,包括但不限於:分散式架構、高可擴充套件、高效能、高併發、Jvm效能調優、Spring,MyBatis,Nginx原始碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個知識點高階進階乾貨,希望對想成為架構師的朋友有一定的參考和幫助
需要更詳細思維導圖和以下資料的可以加一下技術交流分享群:“708 701 457”免費獲取


