1. 程式人生 > >RabbitMQ詳解以及和SpringBoot整合

RabbitMQ詳解以及和SpringBoot整合

各訊息元件的適用場景

  • ActiveMQ ActiveMQ 是一款比較早期的訊息元件,由Apache開源出來的,它能滿足吞吐量一般的業務場景,但是對於高併發場景,效能較差。
  • Kafka Kafka追求高吞吐量的特性,它一開始使用於日誌的收集。缺點是訊息可靠性支援較少,適合產生大量資料的網際網路服務的資料收集業務。
  • RocketMQ RocketMQ 早期由阿里團隊開發的,現在升級為Apache的頂級專案。純 Java 開發,具有高吞吐量、高可用性、適合大規模分散式系統應用的特點。缺點是,有些功能不是開源的,如訊息事務。
  • RabbitMQ RabbitMQ 是由 Erlang 語言編寫的,適合對資料一致性、穩定性和可靠性要求很高的場景,對效能和吞吐量的要求還在其次。

AMQP協議

AMQP,即Advanced Message Queuing Protocol,高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。 AMQP的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。

1、AMQP模型

AMQP協議模型 訊息是直接由生產者傳送到Exchange中的,然後Exchange和Message Queue之間通過某種規則關聯起來(後面會講),生產者是直接訂閱Message Queue的,只要Queue裡面有訊息,就會被消費者消費。這樣就實現了生產者和消費者之間的低耦合性。

2、AMQ核心概念

  • Channel:網路通訊,幾乎所有的操作都在Channel中進行,Channel是進行訊息讀寫的通道。客戶端可建立多個Channel,每個Channel代表一個會話任務。
  • Virtual host:虛擬地址,用於進行邏輯隔離,最上層的訊息路由。
  • Exchange:交換機,接收訊息,根據路由鍵轉發訊息到繫結的佇列。
  • Routing key:一個路由規則,虛擬機器可用它來確定如何路由一個特定訊息
  • Queue:也稱 Message Queue,訊息佇列,儲存訊息,並將它們轉發給消費者。
  • Binding:Exchange和Queue之間的虛擬連線,binding 中可以指定 routing key。

RabbitMQ

RabbitMQ 是基於AMQP協議開發的訊息元件.。

1、RabbitMQ的整體架構

RabbitMQ的整體架構 生產者傳送訊息到Exchange中的,然後Exchange和Message Queue之間通過Routing key建立路由規則,把訊息傳送給特定的Queue,然後訊息推送給訂閱了該Queue的消費者。 RabbitMQ訊息流轉圖

生產者傳送訊息時,需要指定兩個引數,Exchange和Routing key,如果Exchange不指定(即為空),則會採用預設的AMQP default的Exchange,其會根據Routing key的值,路由到對應的 Queue。

2、Exchange

Exchange有四種類型,分別是Direct,Topic,Fanout ,Header。

  • Direct Exchange 所有傳送到Direct Exchange的訊息被轉發到RouteKey中指定的Queue。 注意:Direct模式可以使用RabbitMQ自帶的Exchange:AMQP default,所以不需要將Exchange進行任何繫結(binding)操作,訊息傳遞時,RouteKey必須完全匹配才會被佇列接收,否則該訊息會被拋棄。 exchange在和queue進行binding時會設定routingkey:
channel.QueueBind(queue: "create_pdf_queue",
                    exchange: "pdf_events",
                    routingKey: "pdf_create",
                    arguments: null);

然後我們在將訊息傳送到exchange時會設定對應的routingkey:

channel.BasicPublish(exchange: "pdf_events",
                        routingKey: "pdf_create",
                        basicProperties: properties,
                        body: body);

在direct型別的exchange中,只有這兩個routingkey完全相同,exchange才會選擇對應的binging進行訊息路由。 具體流程如下: Direct 流程

  • Topic Exchange 此型別exchange和上面的direct型別差不多,但direct型別要求routingkey完全相等,這裡的routingkey可以有萬用字元:“*”,“#”,其中 “*” 表示匹配一個單詞, “#”則表示匹配沒有或者多個單詞。 Topic 流程 舉個栗子: Topic Exchange
  • Fanout Exchange 此exchange的路由規則很簡單,直接將訊息路由到所有繫結的佇列中,無須對訊息的routingkey進行匹配操作。 Fanout 流程
  • Header Exchange 此型別的exchange和以上三個都不一樣,其路由的規則是根據header來判斷,其中的header就是以下方法的arguments引數:
Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A",
                    exchange: "agreements",
                    routingKey: string.Empty,
                    arguments: aHeader);

其中的x-match為特殊的header,可以為all則表示要匹配所有的header,如果為any則表示只要匹配其中的一個header即可。 在釋出訊息的時候就需要傳入header值:

Properties properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
mHeader1.Add("format", "pdf");
mHeader1.Add("type", "report");
properties.Headers = mHeader1;

以上就是exchange 型別的總結,一般來說direct和topic用來具體的路由訊息,如果要用廣播的訊息一般用fanout的exchange。 header型別用的比較少,但還是知道一點好。

RabbitMQ與SpringBoot整合

  • 新增 amqp 依賴
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
  • 配置檔案 application.properties
# rabbitmq
spring.rabbitmq.host=node2
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
# 對 rabbitmqTemplate 進行監聽,當訊息由於server的原因無法到達queue時,就會被監聽到,以便執行ReturnCallback方法
# 預設為false,Server端會自動刪除不可達訊息
spring.rabbitmq.template.mandatory=true

# 消費端手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 併發消費
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
  • 生產者

/**
 * @author K. L. Mao
 * @create 2018/9/20
 */
@Service
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 用於監聽Server端給我們返回的確認請求,訊息到了exchange,ack 就返回true
     */
    private final RabbitTemplate.ConfirmCallback  confirmCallback = (correlationData, ack, cause) -> {
        System.out.println("correlationData:" + correlationData);
        System.out.println("ack:" + ack);
        if (!ack){
            System.out.println("補償處理...");
        }
    };

    /**
     * 監聽對不可達的訊息進行後續處理;
     * 不可達訊息:指定的路由key路由不到。
     */
    private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText,
    exchange, routingKey) -> System.out.println("return exchange:" + exchange + ", routingKey:" + routingKey +
            ", replyText:" + replyText);

    /**
     * 傳送訊息
     * @param order
     */
    public void sendOrder(Order order) {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        CorrelationData cd = new CorrelationData();
        // 訊息唯一標識
        cd.setId(UUID.randomUUID().toString().replace("-","") + DateUtils.formatDate(new Date(), "yyyyMMdd"));
        rabbitTemplate.convertAndSend("exchange-2", "springboot.abc", order, cd);
    }
}
  • 消費者
/**
 * @author K. L. Mao
 * @create 2018/9/20
 */
@Service
public class RabbitConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-2", durable = "true"),
            exchange = @Exchange(value = "exchange-2",
                    durable = "true", type = "topic",
                    ignoreDeclarationExceptions = "true"),
            key = "springboot.#")
    )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, @Headers Map<String, Object> properties, Channel channel) throws IOException {
        System.out.println("消費端 order:" + order);
        // deliveryTag: 確認訊息的條數,一般為1
        Long deliveryTag = (Long) properties.get(AmqpHeaders.DELIVERY_TAG);
        System.out.println("deliveryTag:" + deliveryTag);
        // 限流處理:訊息體大小不限制,每次限制消費一條,只作用於該Consumer層,不作用於Channel
        channel.basicQos(0, 1, false);
        // 手工ACK,不批量ack
        channel.basicAck(deliveryTag, false);
    }

可以直接使用@RabbitListener註解,宣告Queue和Exchange以及Binding關係。 消費端接收的訊息是Message物件,結構為:

package org.springframework.messaging;

public interface Message<T> {
    T getPayload();

    MessageHeaders getHeaders();
}

我們可以直接通過註解@Payload 獲取我們傳輸的資料,通過註解@Headers 獲取訊息請求頭。 這裡我們增加了訊息限流的功能,防止生產過多,導致消費者消費吃力的情況: channel.basicQos(0, 1, false):0表示對訊息的大小無限制,1表示每次只允許消費一條,false表示該限制不作用於channel。 同時,我們這裡採用手工ACK的方式,因為我們配置檔案配置了spring.rabbitmq.listener.simple.acknowledge-mode=manual: channel.basicAck(deliveryTag, false):deliveryTag表示處理的訊息條數(一般為1),從heaers中取,false表示不批量ack。

DLX

  • DLX定義 DLX為Dead Letter Exchange,死信佇列。當一個訊息在一個佇列中變成死信(dead message)之後,它能重新publish到另一個Exchange,這個Exchange就是DLX。
  • 訊息變成死信的幾種情況 1、訊息被拒絕,ack為false,並且 requeue=false; 2、訊息TTL(Time To Live)過期,指訊息達到了過期時間; 3、佇列達到最大長度。
  • 死信佇列程式碼演示: 1、宣告一個正常的Exchange、Queue以及Binding
    /**
     * 宣告一個正常交換機
     * @return
     */
    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange("NORMAL_EXCHANGE").durable(true).build();
    }
    /**
     * 宣告一個正常佇列,並且配置正常交換機
     * @return
     */
    @Bean
    public Queue normalQueue(){
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange 宣告 死信交換機
        args.put("x-dead-letter-exchange", DL_EXCHANGE);
        return QueueBuilder.durable("NORMAL_QUEUE").withArguments(args).build();
    }
    /**
     * 繫結,這個是正常的佇列,需要有路由規則
     * @return
     */
    @Bean
    public Binding normalBinding(){
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("NORMAL.#").noargs();
    }

2、宣告一個死信佇列,來接收死信訊息

    /**
     * 宣告一個死信交換機,不一定為Topic Exchange, 和交換機型別無關
     * @return
     */
    @Bean
    public Exchange deadLetterExchange(){
        return ExchangeBuilder.topicExchange(DL_EXCHANGE).durable(true).build();
    }
    /**
     * 死信佇列,即requeue、過期(TTL)、超過佇列容量,就會被轉發到此佇列
     * @return
     */
    @Bean
    public Queue deadLetterQueue(){
        return QueueBuilder.durable("DX_QUEUE").build();
    }
    /**
     * 死信佇列進行繫結,一般routingKey為"#",即所有繫結這個Exchange的Queue都能被路由到
     * @return
     */
    @Bean
    public Binding deadLetterBinding(){
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("#").noargs();
    }

3、對傳送的訊息設定TTL,模擬DLX場景

    /**
     * 傳送死信佇列
     * @param msg
     */
    public void sendDlx(String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
            // 設定編碼
            messageProperties.setContentEncoding("UTF-8");
            // 設定過期時間 10s,到了過期時間還沒被消費,則會進入死信佇列
            messageProperties.setExpiration("10000");
            return message;
        };
        rabbitTemplate.convertAndSend("NORMAL_EXCHANGE", "NORMAL.AAA", msg,
                messagePostProcessor, correlationData);
    }

這裡設定了10s的過期時間,訊息一開始是在NORMAL-QUEUE佇列上,如果10s之內還沒被消費,則會進入DX_QUEUE(死信佇列)佇列。

可靠性投遞解決方案

可靠性投遞,即保證訊息的100%被消費。目前,網際網路大廠主要的解決方案,有兩種: 1、訊息落庫,對訊息狀態進行打標 2、訊息的延遲投遞,做二次確認,回撥檢查

  • 訊息落庫 訊息落庫方案
  • 延遲投遞 延遲投遞方案 延遲投遞方案相比訊息落庫方案,優勢是在於把msg db剝離了核心業務,在大業務量的場景中,會減少核心業務的資料庫壓力(少了一次msg db的資料插入)。