1. 程式人生 > >SpringBoot整合RabbitMQ之 典型應用場景實戰一

SpringBoot整合RabbitMQ之 典型應用場景實戰一

實戰前言

RabbitMQ 作為目前應用相當廣泛的訊息中介軟體,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模組中具有重要的作用,比如業務服務模組解耦、非同步通訊、高併發限流、超時業務、資料延遲處理等。

RabbitMQ 官網拜讀

首先,讓我們先拜讀 RabbitMQ 官網的技術開發手冊以及相關的 Features,感興趣的朋友可以耐心的閱讀其中的相關介紹,相信會有一定的收穫,地址可見:

http://www.rabbitmq.com/getstarted.html

閱讀該手冊過程中,我們可以得知 RabbitMQ 其實核心就是圍繞 “訊息模型” 來展開的,其中就包括了組成訊息模型的相關元件:生產者,消費者,佇列,交換機,路由,訊息等!而我們在實戰應用中,實際上也是緊緊圍繞著 “訊息模型” 來展開擼碼的!

下面,我就介紹一下這一訊息模型的演變歷程,當然,這一歷程在 RabbitMQ 官網也是可以窺覽得到的!

enter image description here

enter image description here

enter image description here

上面幾個圖就已經概述了幾個要點,而且,這幾個要點的含義可以說是字如其名!

  1. 生產者:傳送訊息的程式
  2. 消費者:監聽接收消費訊息的程式
  3. 訊息:一串二進位制資料流
  4. 佇列:訊息的暫存區/儲存區
  5. 交換機:訊息的中轉站,用於接收分發訊息。其中有 fanout、direct、topic、headers 四種
  6. 路由:相當於金鑰/第三者,與交換機繫結即可路由訊息到指定的佇列!

正如上圖所展示的訊息模型的演變,接下來我們將以程式碼的形式實戰各種典型的業務場景!

SpringBoot 整合 RabbitMQ 實戰

工欲善其事,必先利其器。我們首先需要藉助 IDEA 的 Spring Initializr 用 Maven 構建一個 SpringBoot 的專案,並引入 RabbitMQ、Mybatis、Log4j 等第三方框架的依賴。搭建完成之後,可以簡單的寫個 RabbitMQController 測試一下專案是否搭建是否成功(可以暫時用單模組方式構建)

緊接著,我們進入實戰的核心階段,在專案或者服務中使用 RabbitMQ,其實無非是有幾個核心要點要牢牢把握住,這幾個核心要點在擼碼過程中需要“時刻的遊蕩在自己的腦海裡”,其中包括:

  1. 我要傳送的訊息是什麼
  2. 我應該需要建立什麼樣的訊息模型:DirectExchange+RoutingKey?TopicExchange+RoutingKey?等
  3. 我要處理的訊息是實時的還是需要延時/延遲的?
  4. 訊息的生產者需要在哪裡寫,訊息的監聽消費者需要在哪裡寫,各自的處理邏輯是啥

基於這樣的幾個要點,我們先小試牛刀一番,採用 RabbitMQ 實戰非同步寫日誌與非同步發郵件。當然啦,在進行實戰前,我們需要安裝好 RabbitMQ 及其後端控制檯應用,並在專案中配置一下 RabbitMQ 的相關引數以及相關 Bean 元件。

RabbitMQ 安裝完成後,開啟後端控制檯應用:http://localhost:15672  輸入guest guest 登入,看到下圖即表示安裝成功

enter image description here

然後是專案配置檔案層面的配置 application.properties

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.concurrency=10
spring.rabbitmq.listener.max-concurrency=20
spring.rabbitmq.listener.prefetch=5

其中,後面三個引數主要是用於“併發量的配置”,表示:併發消費者的初始化值,併發消費者的最大值,每個消費者每次監聽時可拉取處理的訊息數量。

接下來,我們需要以 Configuration 的方式配置 RabbitMQ 並以 Bean 的方式顯示注入 RabbitMQ 在傳送接收處理訊息時相關 Bean 元件配置其中典型的配置是 RabbitTemplate 以及 SimpleRabbitListenerContainerFactory,前者是充當訊息的傳送元件,後者是用於管理  RabbitMQ監聽器listener 的容器工廠,其程式碼如下:

@Configuration
    public class RabbitmqConfig {
    private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class);

    @Autowired
    private Environment env;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    /**
     * 單一消費者
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    /**
     * 多個消費者
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("訊息傳送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("訊息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }}

RabbitMQ 實戰:業務模組解耦以及非同步通訊

在一些企業級系統中,我們經常可以見到一個執行 function 通常是由許多子模組組成的,這個 function 在執行過程中,需要 同步的將其程式碼從頭開始執行到尾,即執行流程是 module_A -> module_B -> module_C -> module_D,典型的案例可以參見彙編或者 C 語言等面向過程語言開發的應用,現在的一些 JavaWeb 應用也存在著這樣的寫法。

而我們知道,這個執行流程其實對於整個 function 來講是有一定的弊端的,主要有幾點:

  1. 整個 function 的執行響應時間將很久;
  2. 如果某個 module 發生異常而沒有處理得當,可能會影響其他 module 甚至整個 function 的執行流程與結果;
  3. 整個 function 中程式碼可能會很冗長,模組與模組之間可能需要進行強通訊以及資料的互動,出現問題時難以定位與維護,甚至會陷入 “改一處程式碼而動全身”的尷尬境地!

故而,我們需要想辦法進行優化,我們需要將強關聯的業務模組解耦以及某些模組之間實行非同步通訊!下面就以兩個場景來實戰我們的優化措施!

場景一:非同步記錄使用者操作日誌

對於企業級應用系統或者微服務應用中,我們經常需要追溯跟蹤記錄使用者的操作日誌,而這部分的業務在某種程度上是不應該跟主業務模組耦合在一起的,故而我們需要將其單獨抽出並以非同步的方式與主模組進行非同步通訊互動資料。

下面我們就用 RabbitMQ 的 DirectExchange+RoutingKey 訊息模型也實現“使用者登入成功記錄日誌”的場景。如前面所言,我們需要在腦海裡迴盪著幾個要點:

  • 訊息模型:DirectExchange+RoutingKey 訊息模型
  • 訊息:使用者登入的實體資訊,包括使用者名稱,登入事件,來源的IP,所屬日誌模組等資訊
  • 傳送接收:在登入的 Controller 中實現傳送,在某個 listener 中實現接收並將監聽消費到的訊息入資料表;實時傳送接收

首先我們需要在上面的 RabbitmqConfig 類中建立訊息模型:包括 Queue、Exchange、RoutingKey 等的建立,程式碼如下:

enter image description here

上圖中 env 獲取的資訊,我們需要在 application.properties 進行配置,其中 mq.env=local

enter image description here

此時,我們將整個專案/服務跑起來,並開啟 RabbitMQ 後端控制檯應用,即可看到佇列以及交換機及其繫結已經建立好了,如下所示:

enter image description here

enter image description here

接下來,我們需要在 Controller 中執行使用者登入邏輯,記錄使用者登入日誌,查詢獲取使用者角色視野資源資訊等,由於篇幅關係,在這裡我們重點要實現的是用MQ實現 “非同步記錄使用者登入日誌” 的邏輯,即在這裡 Controller 將充當“生產者”的角色,核心程式碼如下:

@RestController
    public class UserController {

    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);

    private static final String Prefix="user";

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private UserLogMapper userLogMapper;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Environment env;

    @RequestMapping(value = Prefix+"/login",method = RequestMethod.POST,consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResponse login(@RequestParam("userName") String userName,@RequestParam("password") String password){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            //TODO:執行登入邏輯
            User user=userMapper.selectByUserNamePassword(userName,password);
            if (user!=null){
                //TODO:非同步寫使用者日誌
                try {
                    UserLog userLog=new UserLog(userName,"Login","login",objectMapper.writeValueAsString(user));
                    userLog.setCreateTime(new Date());
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.setExchange(env.getProperty("log.user.exchange.name"));
                    rabbitTemplate.setRoutingKey(env.getProperty("log.user.routing.key.name"));

                    Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(userLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
                    message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON); 
                    rabbitTemplate.convertAndSend(message);         
                }catch (Exception e){
                    e.printStackTrace();
                }

                //TODO:塞許可權資料-資源資料-視野資料
            }else{
                response=new BaseResponse(StatusCode.Fail);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return response;
    }}

在上面的“傳送邏輯”程式碼中,其實也體現了我們最開始介紹的演進中的幾種訊息模型,比如我們是將訊息傳送到 Exchange 的而不是 Queue,訊息是以二進位制流的形式進行傳輸等等。當用 postman 請求到這個 controller 的方法時,我們可以在 RabbitMQ 的後端控制檯應用看到一條未確認的訊息,通過 GetMessage 即可看到其中的詳情,如下:

enter image description here

最後,我們將開發消費端的業務程式碼,如下:

 @Component
    public class CommonMqListener {

    private static final Logger log= LoggerFactory.getLogger(CommonMqListener.class);

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private UserLogMapper userLogMapper;

    @Autowired
    private MailService mailService;

    /**
     * 監聽消費使用者日誌
     * @param message
     */
    @RabbitListener(queues = "${log.user.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeUserLogQueue(@Payload byte[] message){
        try {
            UserLog userLog=objectMapper.readValue(message, UserLog.class);
            log.info("監聽消費使用者日誌 監聽到訊息: {} ",userLog);
            //TODO:記錄日誌入資料表
            userLogMapper.insertSelective(userLog);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

將服務跑起來之後,我們即可監聽消費到上面 Queue 中的訊息,即當前使用者登入的資訊,而且,我們也可以看到“記錄使用者登入日誌”的邏輯是由一條異於主業務執行緒的非同步執行緒去執行的:

enter image description here

“非同步記錄使用者操作日誌”的案例我想足以用於詮釋上面所講的相關理論知識點了,在後續篇章中,由於篇幅限制,我將重點介紹其核心的業務邏輯!

場景二:非同步傳送郵件

傳送郵件的場景,其實也是比較常見的,比如使用者註冊需要郵箱驗證,使用者異地登入傳送郵件通知等等,在這裡我以 RabbitMQ 實現非同步傳送郵件。實現的步驟跟場景一幾乎一致!

1. 訊息模型的建立

enter image description here

2. 配置資訊的建立

enter image description here

3. 生產端

enter image description here

4. 消費端

enter image description here

RabbitMQ 實戰:併發量配置與訊息確認機制

實戰背景

對於訊息模型中的 listener 而言,預設情況下是“單消費例項”的配置,即“一個 listener 對應一個消費者”,這種配置對於上面所講的“非同步記錄使用者操作日誌”、“非同步傳送郵件”等併發量不高的場景下是適用的。但是在對於秒殺系統、商城搶單等場景下可能會顯得很吃力!

我們都知道,秒殺系統跟商城搶單均有一個共同的明顯的特徵,即在某個時刻會有成百上千萬的請求到達我們的介面,即瞬間這股巨大的流量將湧入我們的系統,我們可以採用下面一圖來大致體現這一現象:

enter image description here

當到了“開始秒殺”、“開始搶單”的時刻,此時系統可能會出現這樣的幾種現象:

  • 應用系統配置承載不了這股瞬間流量,導致系統直接掛掉,即傳說中的“宕機”現象;
  • 介面邏輯沒有考慮併發情況,資料庫讀寫鎖發生衝突,導致最終處理結果跟理論上的結果資料不一致(如商品存庫量只有 100,但是高併發情況下,實際表記錄的搶到的使用者記錄資料量卻遠遠大於 100);
  • 應用佔據伺服器的資源直接飆高,如 CPU、記憶體、寬頻等瞬間直接飆升,導致同庫同表甚至可能同 host 的其他服務或者系統出現卡頓或者掛掉的現象;

於是乎,我們需要尋找解決方案!對於目前來講,網上均有諸多比較不錯的解決方案,在此我順便提一下我們的應用系統採用的常用解決方案,包括:

  • 我們會將處理搶單的整體業務邏輯獨立、服務化並做叢集部署;
  • 我們會將那股巨大的流量拒在系統的上層,即將其轉移至 MQ 而不直接湧入我們的介面,從而減少資料庫讀寫鎖衝突的發生以及由於介面邏輯的複雜出現執行緒堵塞而導致應用佔據伺服器資源飆升;
  • 我們會將搶單業務所在系統的其他同資料來源甚至同表的業務拆分獨立出去服務化,並基於某種 RPC 協議走 HTTP 通訊進行資料互動、服務通訊等等;
  • 採用分散式鎖解決同一時間同個手機號、同一時間同個 IP 刷單的現象;

下面,我們用 RabbitMQ 來實戰上述的第二點!即我們會在“請求” -> "處理搶單業務的介面" 中間架一層訊息中介軟體做“緩衝”、“緩壓”處理,如下圖所示:

enter image description here

併發量配置與訊息確認機制

正如上面所講的,對於搶單、秒殺等高併發系統而言,如果我們需要用 RabbitMQ 在 “請求” - “介面” 之間充當限流緩壓的角色,那便需要我們對 RabbitMQ 提出更高的要求,即支援高併發的配置,在這裡我們需要明確一點,“併發消費者”的配置其實是針對 listener 而言,當配置成功後,我們可以在 MQ 的後端控制檯應用看到 consumers 的數量,如下所示:

enter image description here

其中,這個 listener 在這裡有 10 個 consumer 例項的配置,每個 consumer 可以預監聽消費拉取的訊息數量為 5 個(如果同一時間處理不完,會將其快取在 mq 的客戶端等待處理!)

另外,對於某些訊息而言,我們有時候需要嚴格的知道訊息是否已經被 consumer 監聽消費處理了,即我們有一種訊息確認機制來保證我們的訊息是否已經真正的被消費處理。在 RabbitMQ 中,訊息確認處理機制有三種:Auto - 自動、Manual - 手動、None - 無需確認,而確認機制需要 listener 實現 ChannelAwareMessageListener 介面,並重寫其中的確認消費邏輯。在這裡我們將用 “手動確認” 的機制來實戰使用者商城搶單場景。

1.在 RabbitMQConfig 中配置確認消費機制以及併發量的配置

enter image description here

2.訊息模型的配置資訊

enter image description here

3.RabbitMQ 後端控制檯應用檢視此佇列的併發量配置

enter image description here

4.listener 確認消費處理邏輯:在這裡我們需要開發搶單的業務邏輯,即“只有當該商品的庫存 >0 時,搶單成功,扣減庫存量,並將該搶單的使用者資訊記錄入表,非同步通知使用者搶單成功!”

enter image description here

enter image description here

緊接著我們採用 CountDownLatch 模擬產生高併發時的多執行緒請求(或者採用 jmeter 實施壓測也可以!),每個請求將攜帶產生的隨機數:充當手機號 -> 充當訊息,最終入搶單佇列!在這裡,我模擬了 50000 個請求,相當於 50000 手機號同一時間發生搶單的請求,而設定的產品庫存量為 100,這在 product 資料庫表即可設定

enter image description here

6.將搶單請求的手機號資訊壓入佇列,等待排隊處理

enter image description here

7.在最後我們寫個 Junit 或者寫個 Controller,進行 initService.generateMultiThread(); 呼叫模擬產生高併發的搶單請求即可

@RestController
    public class ConcurrencyController {

    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);

    private static final String Prefix="concurrency";

    @Autowired
    private InitService initService;

    @RequestMapping(value = Prefix+"/robbing/thread",method = RequestMethod.GET)
    public BaseResponse robbingThread(){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        initService.generateMultiThread();
        return response;
    }}

8.最後,我們當然是跑起來,在控制檯我們可以觀察到系統不斷的在產生新的請求(執行緒)– 相當於不斷的有搶單的手機號湧入我們的系統,然後入佇列,listener 監聽到請求之後消費處理搶單邏輯!最後我們可以觀察兩張資料庫表:商品庫存表、商品成功搶單的使用者記錄表 - 只有當庫存表中商品對應的庫存量為 0、商品成功搶單的使用者記錄剛好 100 時 即表示我們的實戰目的以及效果已經達到了!!

enter image description here

總結:如此一來,我們便將 request 轉移到我們的 mq,在一定程度緩解了我們的應用以及介面的壓力!當然,實際情況下,我們的配置可能遠遠不只程式碼層次上的配置,比如我們的 mq 可能會做叢集配置、負載均衡、商品庫存的更新可能會考慮分庫分表、庫存更新可能會考慮獨立為庫存 Dubbo 服務並通過 Rest Api 非同步通訊互動並獨立部署等等。這些優化以及改進的目的其實無非是為了能限流、緩壓、保證系統穩定、資料的一致等!而我們的 MQ,在其中可以起到不可磨滅的作用,其字如其名:“訊息佇列”,而佇列具有 “先進先出” 的特點,故而所有進入 MQ 的訊息都將 “乖巧” 的在 MQ 上排好隊,先來先排隊,先來先被處理消費,由此一來至少可以避免 “瞬間時刻一窩蜂的 request 湧入我們的介面” 的情況!

附註:在用 RabbitMQ 實戰上述高併發搶單解決方案,其實我也在資料庫層面進行了優化,即在讀寫存庫時採用了“類似樂觀鎖”的寫法,保證:搶單的請求到來時有庫存,更新存庫時保證有庫存可以被更新!

彩蛋:本博文介紹了幾個典型的RabbitMQ實戰的業務場景,相關原始碼資料庫可以來這裡下載

(1)https://download.csdn.net/download/u013871100/10654482

(2)https://pan.baidu.com/s/1KUuz_eeFXOKF3XRMY2Jcew
學習過程有任何問題均可以與我交流,QQ:1974544863!下一篇博文將繼續典型應用場景實戰之死信佇列的應用。感興趣的童鞋可以關注一下我的微信公眾號!