歡迎關注個人公眾號:石杉的架構筆記(ID:shishan100)

週一至週五早8點半!精品技術文章準時送上!

一、前情提示


上一篇文章億級流量系統架構之如何在上萬併發場景下設計可擴充套件架構(中)?分析了一下如何利用訊息中介軟體對系統進行解耦處理。

同時,我們也提到了使用訊息中介軟體還有利於一份資料被多個系統同時訂閱,供多個系統來使用於不同的目的。

目前的一個架構如下圖所示。


在這個圖裡,我們可以清晰的看到,實時計算平臺釋出的一份資料到訊息中介軟體裡,接著,會進行如下步驟:

  1. 資料查詢平臺,會訂閱這份資料,並落入自己本地的資料庫叢集和快取叢集裡,接著對外提供資料查詢的服務
  2. 資料質量監控系統,會對計算結果按照一定的業務規則進行監控,如果發現有資料計算錯誤,則會立馬進行報警
  3. 資料鏈路追蹤系統,會採集計算結果作為一個鏈路節點,同時對一條資料的整個完整計算鏈路都進行採集並組裝出來一系列的資料計算鏈路落地儲存,最後如果某個資料計算錯誤了,就可以立馬通過計算鏈路進行回溯排查問題


因此上述場景中,使用訊息中介軟體一來可以解耦,二來還可以實現訊息“Pub/Sub”模型,實現訊息的釋出與訂閱。

這篇文章,咱們就來看看,假如說基於RabbitMQ作為訊息中介軟體,如何實現一份資料被多個系統同時訂閱的“Pub/Sub”模型。


二、基於訊息中介軟體的佇列消費模型

上面那個圖,其實就是採用的RabbitMQ最基本的佇列消費模型的支援。

也就是說,你可以理解為RabbitMQ內部有一個佇列,生產者不斷的傳送資料到佇列裡,訊息按照先後順序進入佇列中排隊。

接著,假設佇列裡有4條資料,然後我們有2個消費者一起消費這個佇列的資料。

此時每個消費者會均勻的被分配到2條資料,也就是說4條資料會均勻的分配給各個消費者,每個消費者只不過是處理一部分資料罷了,這個就是典型的佇列消費模型。

如果有同學對這塊基於RabbitMQ如何實現有點不太清楚的話,可以參考之前的一些文章:


之前這幾篇文章,基本給出了上述那個最基本的佇列消費模型的RabbitMQ程式碼實現,以及如何保證消費者宕機時資料不丟失,如何讓RabbitMQ叢集對queue和message都進行持久化。基本上整體程式碼實現都比較完整,大家可以去參考一下。


三、基於訊息中介軟體的“Pub/Sub”模型


但是訊息中介軟體還可以實現一種“Pub/Sub”模型,也就是“釋出/訂閱”模型,Pub就是Publish,Sub就是Subscribe。

這種模型是可以支援多個系統同時消費一份資料的。也就是說,你釋出出去的每條資料,都會廣播給每個系統。

給大家來一張圖,一起來感受一下。

如上圖所示。也就是說,我們想要實現的上圖的效果,實時計算平臺釋出一系列的資料到訊息中介軟體裡。

然後資料查詢平臺、資料質量監控系統、資料鏈路追蹤系統,都會訂閱資料,都會消費到同一份完整的資料,每個系統都可以根據自己的需要使用資料。

這,就是所謂的“Pub/Sub”模型,一個系統釋出一份資料出去,多個系統訂閱和消費到一模一樣的一份資料。

那如果要實現上述的效果,基於RabbitMQ應該怎麼來處理呢?


四、RabbitMQ中的exchange到底是個什麼東西?


實際上來說,在RabbitMQ裡面是不允許生產者直接投遞訊息到某個queue(佇列)裡的,而是隻能讓生產者投遞訊息給RabbitMQ內部的一個特殊元件,叫做“exchange”。

關於這個exchange,大概你可以把這個元件理解為一種訊息路由的元件。

也就是說,實時計算平臺傳送出去的message到RabbitMQ中都是由一個exchange來接收的。

然後這個exchange會根據一定的規則決定要將這個message路由轉發到哪個queue裡去,這個實際上就是RabbitMQ中的一個核心的訊息模型。

大家看下面的圖,一起來理解一下。


五、預設的exchange


在之前的文章裡,我們投遞訊息到RabbitMQ的時候,也沒有用什麼exchange,但是為什麼就還是把訊息投遞到了queue裡去呢?

那是因為我們用了預設的exchange,他會直接把訊息路由到你指定的那個queue裡去,所以如果簡單用佇列消費模型,不就省去了exchange的概念了嗎。


上面這段就是之前我們給大家展示的,讓訊息持久化的一種投遞訊息的方式。

大家注意裡面的第一個引數,是一個空的字串,這個空字串的意思,就是說投遞訊息到預設的exchange裡去,然後他就會路由訊息到我們指定的queue裡去。


六、將訊息投遞到fanout exchange


在RabbitMQ裡,exchange這種元件有很多種型別,比如說:direct、topic、headers以及fanout。這裡咱們就來看看最後一種,fanout這種型別的exchange元件。

這種exchange元件其實非常的簡單,你可以建立一個fanout型別的exchange,然後給這個exchange繫結多個queue。

接著只要你投遞一條訊息到這個exchange,他就會把訊息路由給他繫結的所有queue。

使用下面的程式碼就可以建立一個exchange,比如說在實時計算平臺(生產者)的程式碼裡,可以加入下面的一段,建立一個fanout型別的exchange。第一個引數我們叫做“rt_compute_data”,這個就是exchange的名字,rt就是“RealTime”的縮寫,意思就是實時計算系統的計算結果資料。

第二個引數就是定義了這個exchange的型別是“fanout”。

channel.exchangeDeclare(
            "rt_compute_data", 
            "fanout");
複製程式碼

接著我們就採用下面的程式碼來投遞訊息到我們建立好的exchange元件裡去:

大家會注意到,此時訊息就是投遞到指定的exchange裡去了,但是路由到哪個queue裡去呢?此時我們暫時還沒確定,要讓消費者自己來把自己的queue繫結到這個exchange上去才可以。


七、繫結自己的佇列到exchange上去消費


我們對消費者的程式碼也進行修改,之前我們在這裡關閉了autoAck機制,然後每次都是自己手動ack。


上面的程式碼裡,每個消費者系統,都會有一些不一樣,就是每個消費者都需要定義自己的佇列,然後繫結到exchange上去。比如:

  • 資料查詢平臺的佇列是“rt_compute_data_query
  • 資料質量監控平臺的佇列是“rt_compute_data_monitor
  • 資料鏈路追蹤系統的佇列是“rt_compute_data_link


這樣,每個訂閱這份資料的系統其實都有一個屬於自己的佇列,然後佇列裡被會被exchange路由進去實時計算平臺生產的所有資料。

而且因為是多個佇列的模式,每個系統都可以部署消費者叢集來進行資料的消費和處理,非常的方便。


八、整體架構圖


最後,給大家來一張大圖,我們再跟著圖,來捋一捋整個流程。

如上圖所示,首先,實時計算平臺會投遞訊息到“rt_compute_data”這個“exchange”裡去,但是他沒指定這個exchange要路由訊息到哪個佇列,因為這個他本身是不知道的。

接著資料查詢平臺、資料質量監控系統、資料鏈路追蹤系統,就可以宣告自己的佇列,都繫結到exchange上去。

因為queue和exchange的繫結,在這裡是要由訂閱資料的平臺自己指定的。而且因為這個exchange是fanout型別的,他只要接收到了資料,就會路由資料到所有繫結到他的佇列裡去,這樣每個佇列裡都有同樣的一份資料,供對應的平臺來消費。

而且針對每個平臺自己的佇列,自己還可以部署消費服務叢集來消費自己的一個佇列,自己的佇列裡的資料還是會均勻分發給各個消費服務例項來處理,每個消費服務例項會獲取到一部分的資料。

大家思考一下,這樣是不是就實現了不同的系統訂閱一份資料的“Pub/Sub”的模型?

當然,其實RabbitMQ還支援各種不同型別的exchange,可以實現各種複雜的功能。

後續我們將會給大家通過實際的線上系統架構案例,來闡述訊息中介軟體技術的各種用法。

END


如有收穫,請幫忙轉發,您的鼓勵是作者最大的動力,謝謝!


一大波微服務、分散式、高併發、高可用的原創系列文章正在路上

歡迎掃描下方二維碼,持續關注:


石杉的架構筆記(id:shishan100)

十餘年BAT架構經驗傾囊相授


推薦閱讀:

1、拜託!面試請不要再問我Spring Cloud底層原理

2、【雙11狂歡的背後】微服務註冊中心如何承載大型系統的千萬級訪問?

3、【效能優化之道】每秒上萬併發下的Spring Cloud引數優化實戰

4、微服務架構如何保障雙11狂歡下的99.99%高可用

5、兄弟,用大白話告訴你小白都能聽懂的Hadoop架構原理

6、大規模叢集下Hadoop NameNode如何承載每秒上千次的高併發訪問

7、【效能優化的祕密】Hadoop如何將TB級大檔案的上傳效能優化上百倍

8、拜託,面試請不要再問我TCC分散式事務的實現原理坑爹呀!

9、【坑爹呀!】最終一致性分散式事務如何保障實際生產中99.99%高可用?

10、拜託,面試請不要再問我Redis分散式鎖的實現原理!

11、【眼前一亮!】看Hadoop底層演算法如何優雅的將大規模叢集效能提升10倍以上?

12、億級流量系統架構之如何支撐百億級資料的儲存與計算

13、億級流量系統架構之如何設計高容錯分散式計算系統

14、億級流量系統架構之如何設計承載百億流量的高效能架構

15、億級流量系統架構之如何設計每秒十萬查詢的高併發架構

16、億級流量系統架構之如何設計全鏈路99.99%高可用架構

17、七張圖徹底講清楚ZooKeeper分散式鎖的實現原理

18、大白話聊聊Java併發面試問題之volatile到底是什麼?

19、大白話聊聊Java併發面試問題之Java 8如何優化CAS效能?

20、大白話聊聊Java併發面試問題之談談你對AQS的理解?

21、大白話聊聊Java併發面試問題之公平鎖與非公平鎖是啥?

22、大白話聊聊Java併發面試問題之微服務註冊中心的讀寫鎖優化

23、網際網路公司的面試官是如何360°無死角考察候選人的?(上篇)

24、網際網路公司面試官是如何360°無死角考察候選人的?(下篇)

25、Java進階面試系列之一:哥們,你們的系統架構中為什麼要引入訊息中介軟體?

26、【Java進階面試系列之二】:哥們,那你說說系統架構引入訊息中介軟體有什麼缺點?

27、【行走的Offer收割機】記一位朋友斬獲BAT技術專家Offer的面試經歷

28、【Java進階面試系列之三】哥們,訊息中介軟體在你們專案裡是如何落地的?

29、【Java進階面試系列之四】扎心!線上服務宕機時,如何保證資料100%不丟失?

30、一次JVM FullGC的背後,竟隱藏著驚心動魄的線上生產事故!

31、【高併發優化實踐】10倍請求壓力來襲,你的系統會被擊垮嗎?

32、【Java進階面試系列之五】訊息中介軟體叢集崩潰,如何保證百萬生產資料不丟失?

33、億級流量系統架構之如何在上萬併發場景下設計可擴充套件架構(上)?

34、億級流量系統架構之如何在上萬併發場景下設計可擴充套件架構(中)?



作者:石杉的架構筆記
連結:https://juejin.im/post/5c23901a51882565986a1909
來源:掘金
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。