1. 程式人生 > >RabbitMQ之訊息生產消費

RabbitMQ之訊息生產消費

前言

  前面學習了RabbitMQ的使用場景和基礎概念,現在來用一個RabbitMQ案例,來正式學習一下MQ的使用。

  這個案例分為兩個部分,訊息生產者和訊息消費者。模擬使用者註冊場景,生產者將使用者手機號傳送到MQ,消費者監聽MQ佇列,獲取使用者手機號傳送簡訊。

具體實現功能如下:

  • 使用RabbitMQ客戶端訊息confirm和redis,確保訊息正確傳送到MQ,不會產生丟失;
  • 結合訊息過期(TTL)和死信佇列(DLX)實現消費異常的延遲重試;
  • 訊息處理失敗達到最大重試次數之後,將其傳送到失敗佇列,等待人工處理。

正文

專案版本:Springboot2.0.4、RabbitMQ3.7.7
專案具體流程如下:

在這裡插入圖片描述

具體程式碼
1、定義一個訊息交換機和三個訊息佇列。
@Slf4j
@Configuration
public class RabbitConfig {
    /**
     * 宣告一個交換機
     * @return
     */
    @Bean
    public TopicExchange smsCaptchaExchange() {
        return new TopicExchange("sms_captcha", true, false);
    }
    
    /**
     * 正常的消費佇列
     * @return
     */
    @Bean
    public Queue smsCaptchaQueue() {
        return new Queue("
[email protected]
", true, false, false); } /** * 延時重試佇列 */ @Bean public Queue smsCaptchaRetryQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 10 * 1000); arguments.put("x-dead-letter-exchange", "sms_captcha"); arguments.put("x-dead-letter-routing-key", "sms.captcha"); return new Queue("
[email protected]
@retry", true, false, false, arguments); } /** * 處理失敗後存放訊息的佇列 * @return */ @Bean public Queue smsCaptchaFailedQueue() { return new Queue("[email protected]@failed", true, false, false); } /** * 將佇列與交換機繫結 * @return */ @Bean public Binding smsCaptchaBinding() { return BindingBuilder.bind(smsCaptchaQueue()).to(smsCaptchaExchange()).with("sms.captcha"); } @Bean public Binding smsCaptchaRetryBinding() { return BindingBuilder.bind(smsCaptchaRetryQueue()).to(smsCaptchaExchange()).with("sms.captcha.retry"); } @Bean public Binding smsCaptchaFailedBinding() { return BindingBuilder.bind(smsCaptchaFailedQueue()).to(smsCaptchaExchange()).with("sms.captcha.failed"); } }

這裡為了省事,只定義了一個交換機,這和三個交換機沒有區別。三個佇列分別為正常消費佇列、延遲重試佇列和訊息處理失敗後的存放佇列。

2、使用者請求註冊,並將註冊資訊包裝為message,傳送到MQ佇列中。
	@PostMapping(value = "register")
    public User register(@RequestBody User user) {
        String userJson = JSONObject.toJSONString(user);
        log.info("使用者註冊,註冊資訊:{}", userJson);

        String msgId = RandomUtil.getRandomUUID(20);
        Message message = MessageBuilder.withBody(userJson.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setCorrelationId(msgId)
                .build();
        CorrelationData correlationData = new CorrelationData(msgId);

        /**
         * 將msgId與Message的關係儲存起來,放到快取中.
         */
        redisTemplate.opsForHash().put("message", msgId, message);
        redisTemplate.opsForHash().put("exchange", msgId, "sms_captcha");
        redisTemplate.opsForHash().put("routingKey", msgId, "sms.captcha");

        //TODO 有些時候,由於某些原因傳送端接收不到MQ的confirm,這條訊息可能已經丟了,需要重新發送。
        //TODO 還需要定期查詢快取,檢視訊息的傳送時間距現在是否已經超過了一定時間,超過了這個時間但是訊息還存在快取中,則訊息很有可能已經丟了,傳送端沒有收到confirm,需要重新發送。
        //TODO 需要快取的資料有點雜,redis這麼設計好嗎?

        rabbitTemplate.convertAndSend("sms_captcha", "sms.captcha", message, correlationData);
        return user;
    }
  快取訊息

  可以看到上面程式碼中用redis快取了訊息、交換機和路由鍵。這是因為訊息傳送到MQ可能會失敗,一旦失敗訊息豈不是丟了?為避免這種情況,我們在業務程式碼中必須將message儲存起來,確保其進入了MQ再將儲存的訊息刪除。

  那麼我們是怎麼知道訊息是否被正確傳送到RabbitMQ伺服器了呢?不用擔心,RabbitMQ有事務和confirm機制,可以確保訊息正確到傳送到RabbitMQ伺服器。但是由於事務會大大降低RabbitMQ的訊息處理能力,所以一般使用的是非同步的confirm機制。

  生產者訊息確認機制

  首先我們開啟rabbitMQ的生產者訊息確認機制。

spring:
  rabbitmq:
    publisher-confirms: true
    publisher-returns: true

  其中,設定spring.rabbitmq.publisher-confirms=true之後,RabbitMQ會監聽交換機,我們可以編寫自己的回撥方法,一旦訊息傳送到MQ的交換機,回撥方法就會被呼叫。

/**
 * 當訊息傳送到交換機(exchange)時,回撥方法會被呼叫.
 */
template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                redisTemplate.opsForHash().delete("message", correlationData.getId());
                redisTemplate.opsForHash().delete("exchange", correlationData.getId());
                redisTemplate.opsForHash().delete("routingKey", correlationData.getId());
            } else {
                log.error("訊息傳送失敗, cause:{}", cause);
                Message message = (Message) redisTemplate.opsForHash().get("message", correlationData.getId());
                String exchange = (String) redisTemplate.opsForHash().get("exchange", correlationData.getId());
                String routingKey = (String) redisTemplate.opsForHash().get("routingKey", correlationData.getId());
                rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);

                //TODO 會不會由於某些原因,ack一直是false,導致訊息不斷地被重發。這樣需要加發送次數的限制
            }
        });

  上面回撥邏輯也很簡單,訊息成功進入交換機,就刪除快取中的訊息;交換機拒絕訊息或者其他原因,會再次嘗試傳送訊息。

  設定spring.rabbitmq.publisher-returns=true的作用是,開啟returnListener,當訊息從交換機路由佇列失敗時,會呼叫returnCallback的回撥方法。

  另外,使用returnCallback的前提必須設定mandatory屬性為true。mandatory屬性為true表示,如果交換機無法根據自身型別和路由鍵找到一個與之匹配的佇列,那麼RabbitMQ會將訊息返還給生產者,當設定為false時,RabbitMQ將直接丟棄訊息。

/**
 * 當訊息從交換機到佇列失敗時,回撥方法被呼叫。(若成功,則不呼叫)
 * 需要注意的是:該方法呼叫後,MsgSendConfirmCallBack中的confirm方法也會被呼叫,且ack = true
 */
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    log.error("message send to queue failed.");
    log.error("exchange: {}, routingKey: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
    rabbitTemplate.convertAndSend(exchange, routingKey, message);

    //重發次數應該加限制,以免路由錯誤或者佇列不存在
});
  ack丟失怎麼辦?

  訊息生產者可以通過返回的ack資訊知道訊息是否被髮送到RabbitMQ伺服器,但是有時候由於網路故障或者其他原因,RabbitMQ伺服器返回給傳送者的ack丟失了,傳送端不知道訊息是傳送成功了還是失敗了,這該怎麼辦呢?

  答案就是重新發送,不管之前是否傳送成功,為了保證訊息必須被髮送到MQ伺服器,對於沒有迴應的訊息必須重新發送。解決方案就是在傳送訊息的時候對訊息的傳送時間也進行快取,定期掃描快取,如果超過一定時間訊息還在快取中,就重新發送此條訊息。

3、消費者監聽MQ佇列
@RabbitListener(queues = "[email protected]")
public void handleMessage(Message message, Channel channel) {
    log.info("handleMessage begin... the message is : {}", message);
    try {
        User user = JSON.parseObject(new String(message.getBody()), User.class);
        smsService.sendSms(user);
    } catch (Exception e) {
        log.error("handleMessage failed. error:", message, e);
        Long retryCount = getRetryCount(message.getMessageProperties());
        if (retryCount > 3) {
            log.info("將訊息置入失敗佇列,等待人工處理.");
        } else {
            log.info("將訊息置入延時重試佇列,重試次數:" + retryCount);
            rabbitTemplate.convertAndSend("sms_captcha_retry", "sms.captcha.retry", message);
        }
    }

    //手動acknowledge
    try {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (IOException e) {
        log.error("手動確認訊息失敗!");
        e.printStackTrace();
    }
}

  RabbitMQ中,正常的訊息在轉換成Dead Letter時會在其header中新增一個名為“x-death”的陣列,其中包含了變成Dead Letter的次數count。我們可以根據這個特點,控制消費重試次數。

private Long getRetryCount(MessageProperties properties) {
    Long retryCount = 0L;
    Map<String, Object> headers = properties.getHeaders();
    if (headers != null) {
        if (headers.containsKey("x-death")) {
            log.info("包含x-death頭部資訊.");
            List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
            if (deaths.size() > 0) {
                Map<String, Object> death = deaths.get(0);
                retryCount = (Long) death.get("count");
                System.out.println("當前重試次數:" + retryCount);
            }
        }
    }
    return retryCount;
}

在這裡插入圖片描述
  可以看到,消費者監聽MQ佇列,獲取訊息進行處理。如果訊息處理失敗,會將失敗的訊息傳送到延遲重試佇列中,再次消費。當重試次數達到三次,就不再重試,轉而將訊息傳送到失敗佇列,等待人工處理。

  確保訊息冪等性

  在訊息生產的時候提到過,由於ack可能會丟失,會導致訊息生產者存在訊息超發情況,所以消費者必須確保訊息的冪等性,以避免訊息的重複消費。

總結

參考部落格:

初寫部落格,語言組織不是很好,大家見諒。