1. 程式人生 > >Spring Boot (25) RabbitMQ消息隊列

Spring Boot (25) RabbitMQ消息隊列

res bin context 程序 了解 可靠 版本 prope message

MQ全程(Message Queue)又名消息隊列,是一種異步通訊的中間件。可以理解為郵局,發送者將消息投遞到郵局,然後郵局幫我們發送給具體的接收者,具體發送過程和時間與我們無關,常見的MQ又kafka、activemq、zeromq、rabbitmq等等。

RabbitMQ

  RabbitMQ是一個遵循AMQP協議,由面向高並發的erlang語言開發而成,用在實時的對可靠性要求比較高的消息傳遞上,支持多種語言客戶端,支持延遲隊列。

基礎概念

  Broker:消息隊列的服務器實體

  Exchange:消息交換機,它指定消息按什麽規則,路由到哪個隊列

  Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列

  Binding:綁定,它主要是把exchange和queue按照路由規則綁定起來

  Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞

  vhost:虛擬主機,一個broker裏可以開設多個vhost,用作不同用戶的權限分離

  producer:消息生產者,投遞消息的程序

  consumer:消息消費者,接收消息的程序

  channel:消息通道,在客戶端的每個連接裏,可以建立多個channel,每個channel代表一個會話任務

常見應用場景

  1.郵箱發送:用戶註冊後投遞消息到rabbitmq中,由消息的消費方異步的發送郵件,提升系統響應速度。

  2.流量削鋒:一般在秒殺活動中應用廣泛,秒殺會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前端加入消息隊列。用於控制活動人數,將超過此一定閥值的訂單直接丟棄。緩解端時間的高流量壓垮應用。

  3.訂單超時:利用rabbitmq的延遲隊列,可以很簡單的實現訂單超功能,比如用戶在下單後30分鐘未支付取消訂單。

導入依賴

在pom.xml中添加spring-boot-starter-amqp的依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

屬性配置

在application.yml中配置rabbitmq的相關信息,這裏配置了手動的ACK開關

spring:
  rabbitmq:
    username: david
    password: 123456
    host: localhost
    port: 5672
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual #手動ACK 不開啟自動ACK模式,目的是防止報錯後為正確處理消息丟失 默認為none

定義隊列

  

package com.spring.boot.utils;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    public static final String DEFAULT_BOOK_QUEUE = "dev.book.register.default.queue";
    public static final String MANUAL_BOOK_QUEUE = "dev.book.register.manual.queue";

    @Bean
    public Queue defaultBookQueue(){
        //參數1 隊列名,參數2 是否持久化處理
        return new Queue(DEFAULT_BOOK_QUEUE,true);
    }
    @Bean Queue manualBookQueue(){
        return new Queue(MANUAL_BOOK_QUEUE,true);
    }
}

實體類

package com.spring.boot.bean;

import java.io.Serializable;

public class Book implements Serializable{
    private Integer id;
    private String name;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

控制器

新建一個bookController,用於消息發送

package com.spring.boot.controller;

import com.spring.boot.bean.Book;
import com.spring.boot.utils.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/books")
public class BookController {

    //spring boot 2.x版本推薦構造器註入 而不是屬性註入
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public BookController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @GetMapping("/defaultMessage")
    public void defaultMessage() {
        Book book = new Book();
        book.setId(1);
        book.setName("hello RabbitMQ");
        this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book);
        this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book);
    }

}

消息消費者

默認情況下 spring-boot-data-amqp是自動ACK機制,就意味著MQ會在消息消費完畢後自動幫我們去ACK,這樣依賴就存在這樣一個問題:如果報錯了,消息不會丟失,會無限循環消費,很容易把磁盤空間耗完,雖然可以配置消費的次數但這種做法也不太好。目前比較推薦的就是我們手動ACK然後將消費錯誤的消息轉移到其他的消息隊列中,做補償處理

package com.spring.boot.handler;

import com.rabbitmq.client.Channel;
import com.spring.boot.bean.Book;
import com.spring.boot.utils.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class BookHandler {

    @RabbitListener(queues={RabbitConfig.DEFAULT_BOOK_QUEUE})
    public void listenerAutoACK(Book book, Message message,Channel channel) throws IOException {
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            System.out.println("ListenerAutoAck 監聽到的消息:" + book.toString());
            // TODO 通知MQ 已經被消費完成 可以ACK了
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            //TODO 處理失敗,重新壓入MQ
            channel.basicRecover();
            e.printStackTrace();
        }
    }

    @RabbitListener(queues={RabbitConfig.MANUAL_BOOK_QUEUE})
    public void listenerManualACK(Book book,Message message,Channel channel){
        System.out.println("listenerManualACK監聽到的消息:"+book.toString());
        try{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            //如果報錯了 可以進行容錯處理,比如轉移當前消息進入其他隊列
            e.printStackTrace();
        }
    }
}

測試:啟動項目 輸入路徑 http://localhost:8088/books/defaultMessage

技術分享圖片

技術分享圖片

Spring Boot (25) RabbitMQ消息隊列