1. 程式人生 > >Springboot整合Rabbitmq實現延時消費,並實現可靠的訊息處理

Springboot整合Rabbitmq實現延時消費,並實現可靠的訊息處理

一、Rabbitmq簡介

1.1 rabbitmq 架構

1.2 rabbitmq相關元件介紹

  • exchange: 交換機,主要用來將生產者傳送的訊息路由給伺服器中的佇列。
  • routing-key: 訊息路由的key,生產者在將訊息發到到exchange的時候,需要指定routing-key,這樣exchange才知道將這條訊息路由給哪些佇列。
  • message: 訊息體,主要由訊息頭和訊息body組成,訊息頭包括是否持久化,routing-key等。
  • binding 將訊息佇列和exchange進行繫結,一個繫結就是基於路由鍵將交換機和訊息佇列連線起來。
  • publisher: 訊息生產者,可以理解為一個傳送訊息的客戶端應用程式。
  • consumer: 訊息消費者,可以理解為一個從queue消費訊息的應用程式。
  • queue : 訊息佇列,用來儲存訊息直到傳送給消費者,相當於一個容器,一個容器可以放N條訊息等待被消費。
  • channel : 通道,不管是傳送訊息還是訂閱訊息都是通過通道傳送出去的,相當於複用TCP連線,因為每次都建立一個TCP連線時非常耗資源的。

1.3 rabbimq常用的3種exchange

1.3.1 fanout exchange

不處理路由鍵,你只需要簡單的將佇列繫結到交換機上。一個傳送到交換機的訊息都會被轉發到與該交換機繫結的所有佇列上。


1.3.2 direct exchange

處理路由鍵。需要將一個佇列繫結到交換機上,要求該訊息與一個特定的路由鍵完全匹配。


1.3.3 topic exchange

將路由鍵和某模式進行匹配。此時佇列需要繫結要一個模式上。比如符號”#”匹配一個或多個詞


1.4 rabbitmq訊息的可靠傳輸

一般對於業務不允許由訊息丟失的場景來說,訊息需要保證被至少傳輸一次(消費端做冪等),在rabbitmq裡面,可能存在幾種情況,訊息會出現丟失,

  • 生產者弄丟了訊息,即生產者在將訊息傳送到exchange的過程中,可能由於網路等原因半路就丟失了,這個時候可以採用rabbitmq的事務機制,也就是在訊息傳送之前,開啟事務,如果沒有傳送到exchange,生產者收到報錯後回滾事務,然會重新發送,如果訊息正常到達,則提交事務。但是事務相當於同步操作,整體效能不高,所以一般在生產端建議開啟Confirm機制。
  • rabbitmq丟了訊息,這個時候需要開啟rabbitmq的持久化功能,首先是佇列開啟持久化,同時傳送的訊息的deliveryMode也設定為持久的,這樣,訊息就會持久化到磁碟了,即使rabbitmq掛了,重啟後也能恢復資料,配合客戶端的confirm機制,就算是在還沒持久化到磁碟之前,rabbitmq掛了,生產者會重發,在重試次數範圍內,只要持久化到磁碟了,就會返回ack。
  • 消費者丟了訊息,這個需要關閉rabbitmq的自動ack功能,因為自動ack的話,只要消費者收到了就會ack,並不關心消費者是否成功消費, 消費者ack需要由應用自己決定什麼時候返回ack。

二、Demo演示

2.1 maven匯入rabbitmq依賴包

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- rabbitmq starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
    </dependencies>

2.2 新建application


    package com.yunsom.springboot.rabbitmq;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    @SpringBootApplication
    public class SpringbootMessageApplication {
      public static void main(String[] args) {
        SpringApplication.run(SpringbootMessageApplication.class, args);
      }
    }

2.3 新建rabbitmq配置檔案

在springboot裡面,因為使用了其封裝好的rabbitmq,在實際的開發中,根據不同的exchange模式,需要自己申明不同的exchange,下面以direct exchange模式為例子進行說明

    package com.yunsom.springboot.rabbitmq.config;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.ExchangeBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class RabbitMqConfig {
      /**
       * 死信佇列 交換機識別符號
       */
      private static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
      /**
       * 死信佇列交換機繫結鍵識別符號
       */
      private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
      /**
       * 死信佇列裡面訊息的超時時間
       */
      private static final String X_MESSAGE_TTL = "x-message-ttl";
      /**
       * 宣告交換機,支援持久化.
       * rabbitmq常用幾種exchange,比如direct, fanout, topic,可根據具體業務需求配置
       * 命名規範參考 scm3.services,scm3.services.retry,scm3.services.failed
       * @return the exchange
       */
      @Bean("scm3.materials")
      public Exchange directExchange() {
        //.durable(true) exchange的持久化
        return ExchangeBuilder.directExchange("scm3.materials").durable(true).build();
      }
      @Bean("scm3.materials.retry")
      public Exchange retryDirectExchange() {
        return ExchangeBuilder.directExchange("scm3.materials.retry").durable(true).build();
      }
      @Bean("scm3.materials.fail")
      public Exchange failDirectExchange() {
        return ExchangeBuilder.directExchange("scm3.materials.fail").durable(true).build();
      }
      /**
       * ##########################################供需關係服務-宣告queue#####################################################
       */
      /**
       * 宣告一個佇列 .{供需關係主佇列} 佇列名稱參考 【服務名稱】@訂閱服務標識 如
       * [email protected]供需關係,[email protected]供需關係@retry,[email protected]供需關係@failed
       * [email protected]採購計劃,[email protected]採購計劃@retry,@[email protected]採購計劃@failed
       * 
       * @return the queue
       */
      @Bean("[email protected]")
      public Queue directQueue() {
        return QueueBuilder.durable("[email protected]").build();
      }
      /**
       * 供需關係 重試佇列
       * 
       * @return
       */
      @Bean("[email protected]@retry")
      public Queue retryDirectQueue() {
        Map<String, Object> args = new ConcurrentHashMap<>(3);
        // 將訊息重新投遞到exchange中
        args.put(DEAD_LETTER_QUEUE_KEY, "scm3.materials");
        args.put(DEAD_LETTER_ROUTING_KEY, "[email protected]");
        //在佇列中延遲30s後,訊息重新投遞到x-dead-letter-exchage對應的佇列中,routingkey是自己配置的
        args.put(X_MESSAGE_TTL, 30 * 1000);
        return QueueBuilder.durable("[email protected]@retry").withArguments(args).build();
      }
      /**
       * 供需關係 失敗佇列
       * 
       * @return
       */
      @Bean("[email protected]@failed")
      public Queue failDirectQueue() {
        return QueueBuilder.durable("[email protected]@failed").build();
      }
      /**
       * ###########################################供需關係結束###############################################
       */
      /** ########################################使用者服務開始############################################ */
      /**
       * @return the queue
       */
      @Bean("[email protected]")
      public Queue userDirectQueue() {
        return QueueBuilder.durable("[email protected]").build();
      }
      /**
       * 使用者服務 重試佇列
       * 
       * @return
       */
      @Bean("[email protected]@retry")
      public Queue userRetryDirectQueue() {
        Map<String, Object> args = new ConcurrentHashMap<>(3);
        args.put(DEAD_LETTER_QUEUE_KEY, "scm3.materials");
        args.put(DEAD_LETTER_ROUTING_KEY, "[email protected]");
        args.put(X_MESSAGE_TTL, 30 * 1000);
        return QueueBuilder.durable("[email protected]@retry").withArguments(args).build();
      }
      /**
       * 供需關係 失敗佇列
       * 
       * @return
       */
      @Bean("[email protected]@failed")
      public Queue userFailDirectQueue() {
        return QueueBuilder.durable("[email protected]@failed").build();
      }
      /** #####################################使用者服務結束################################################ */
      /**
       * 以下是消費者需要處理的 通過繫結鍵(rounting key) 將指定佇列繫結到一個指定的交換機 .要求該訊息與一個特定的路由鍵完全匹配
       * @param queue the queue
       * @param exchange the exchange
       * @return the binding
       */
      /**
       * ######################################供需關係繫結###################################################
       */
      @Bean
      public Binding directBinding(@Qualifier("[email protected]") Queue queue,
          @Qualifier("scm3.materials") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("direct_rounting_key").noargs();
      }
      @Bean
      public Binding directQueueBinding(@Qualifier("[email protected]") Queue queue,
          @Qualifier("scm3.materials") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("[email protected]").noargs();
      }
      @Bean
      public Binding retryDirectBinding(@Qualifier("[email protected]@retry") Queue queue,
          @Qualifier("scm3.materials.retry") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("[email protected]").noargs();
      }
      @Bean
      public Binding failDirectBinding(@Qualifier("[email protected]@failed") Queue queue,
          @Qualifier("scm3.materials.fail") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("[email protected]").noargs();
      }
      /**
       * ######################################使用者服務繫結###################################################
       */
      @Bean
      public Binding userDirectBinding(@Qualifier("[email protected]") Queue queue,
          @Qualifier("scm3.materials") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("direct_rounting_key").noargs();
      }
      @Bean
      public Binding userDirectQueueBinding(@Qualifier("[email protected]") Queue queue,
          @Qualifier("scm3.materials") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("[email protected]").noargs();
      }
      @Bean
      public Binding userRetryDirectBinding(@Qualifier("[email protected]@retry") Queue queue,
          @Qualifier("scm3.materials.retry") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("[email protected]").noargs();
      }
      @Bean
      public Binding userFailDirectBinding(@Qualifier("[email protected]@failed") Queue queue,
          @Qualifier("scm3.materials.fail") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("[email protected]").noargs();
      }
    }

2.3 新建生產者

生產者只需要關注將某個訊息發到某個exchange上,並指定routingkey即可。

    package com.yunsom.springboot.rabbitmq.sender;
    import java.util.Map;
    import java.util.UUID;
    import java.util.concurrent.ConcurrentHashMap;
    import javax.annotation.PostConstruct;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import lombok.extern.slf4j.Slf4j;
    @Component
    @Slf4j
    public class MessageSender {
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Value("${java.rabbitmq.send.service.exchange}")
      private String sendExchange;
      @Value("${java.rabbitmq.send.service.rountkey}")
      private String rountKey;
      /**
       * demo級別,先本地快取,真正實現可考慮用redis 如果是放到redis中,有可能exchange一直不給生產者反饋{比如rabbitmq掛了,這種只能重啟rabbitmq}
       * 如果是網路原因,恢復時間應該很快,下次重發的時候網路好了,進行正常的ack 在redis裡面,不能設定訊息的過期時間,可以用分散式定時任務,每隔一段時間
       * 去查redis裡面有沒有被訊息確認的訊息,然後取出來重新發送(存的時候,就要存如當前訊息被髮送的時間)
       */
      Map<String, Message> messageMap = new ConcurrentHashMap<String, Message>();
      /**
       * confirm機制,當生產者傳送訊息給exchange的時候,如果沒有發到到exchange,會收不到ack,
       * 如果送達到了exchange,會回撥該方法,如果訊息,佇列,交換機都設定了持久化,那麼訊息 在持久化到磁碟後,才會ack給生產者,也就是說生產者收到了ack後,訊息肯定是可靠的了,已經
       * 到磁碟了
       */
      @PostConstruct
      public void init() {
        /**
         * 如果設定了spring.rabbitmq.publisher-confirms=true(預設是false),生產者會收到rabitmq-server返回的ack
         * 這個回撥方法裡面沒有原始訊息,相當於只是一個通知作用
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
          if (null != messageMap && !messageMap.isEmpty()) {
            if (null != cause && !"".equals(cause)) {
              System.out.println("失敗原因:" + cause);
              // 重發的時候到redis裡面取,消費成功了,刪除redis裡面的msgId
              Message message = messageMap.get(correlationData.getId());
              rabbitTemplate.convertAndSend(sendExchange, rountKey, message, correlationData);
            } else {
              messageMap.remove(correlationData.getId());
              System.out.println("訊息唯一標識:" + correlationData + ";確認結果:" + ack);
            }
          }
        });
        // rabbitTemplate.setMandatory(true);如果設定了mandatory=true(預設為false)
        // 這樣設定的話,如果訊息到達exchange後,沒有queue與其繫結,會將訊息返給生產者,生產者會
        // 回撥這個方法
        rabbitTemplate
            .setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
              System.out.println("send message failed: " + replyCode + " " + replyText);
            });
      }
      /**
       * 同步傳送訊息,效率低
       * 
       * @param receiveMessage
       */
      public void syncSend(String receiveMessage) {
        Message message = MessageBuilder.withBody(receiveMessage.getBytes())
            .setContentType("application/json").build();
        // 同步等待的超時時間
        rabbitTemplate.setReplyTimeout(3 * 1000);
        Object receiveObject = rabbitTemplate.convertSendAndReceive(sendExchange, rountKey, message);
        System.out.println("生產者收到消費者返回的訊息:" + receiveObject);
      }
      /**
       * 非同步傳送訊息, 非同步傳送,效能更高,但是無法知道訊息是否傳送到了exchange,可以開啟生產端的重試機制
       * spring.rabbitmq.template.retry.enabled=true,預設是false,另外 重試機制預設是重試3次,每次間隔一定時間再次重試,
       * 
       * @param receiveMessage
       */
      public void asyncSend(String receiveMessage) {
        String msgId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(msgId);
        // 預設訊息就是持久化的 MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;
        Message message = MessageBuilder.withBody(receiveMessage.getBytes())
            .setContentType("application/json").setCorrelationId(msgId).build();
        messageMap.put(msgId, message);
        // 第4個引數是關聯釋出確定的引數
        try {
          // rabbitTemplate.setMandatory(true);
          // 如果不開啟訊息回撥,可以不要第4個引數,因為在回撥時,可以拿到這個correlationData
          // 最後會呼叫到 void basicPublish(String exchange, String routingKey, boolean mandatory,
          // BasicProperties props, byte[] body)
          // throws IOException;
          rabbitTemplate.convertAndSend(sendExchange, rountKey, message, correlationData);
          log.info("生產者傳送訊息:" + receiveMessage + ",訊息Id:" + msgId);
        } catch (AmqpException e) {
          log.info("生產者傳送訊息:" + receiveMessage + "發生了異常:" + e.getMessage());
        }
      }
    }

2.4 新建消費者

2.4.1 使用者服務消費者

    package com.yunsom.springboot.rabbitmq.consumer;
    import java.io.IOException;
    import javax.annotation.Resource;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import com.rabbitmq.client.Channel;
    import com.yunsom.springboot.rabbitmq.service.MessageHandler;
    import com.yunsom.springboot.rabbitmq.util.RabbitMqUtil;
    import lombok.extern.slf4j.Slf4j;
    /**
     * deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 註冊後,會建立起一個 Channel , RabbitMQ 會用 basic.deliver
     * 方法向消費者推送訊息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條訊息的唯一標識 ID,是一個單調遞增的正整數,
     * delivery tag 的範圍僅限於 Channel
     */
    @Component
    @Slf4j
    public class UserRabbitMqConsumer {
      @Resource
      private RabbitTemplate rabbitTemplate;
      @Resource
      private MessageHandler messageHander;
      @Value("${java.rabbitmq.consumer.service.retry.exchange}")
      private String userServiceListenerRetryExchange;
      @Value("${java.rabbitmq.consumer.service.fail.exchange}")
      private String userServiceListenerFailExchange;
      @Value("${java.rabbitmq.consumer.service.user.retry.routingkey}")
      private String userSerivceRetryOrFailRoutingKey;
      @SuppressWarnings("unused")
      @RabbitListener(queues = {"[email protected]"})
      public void consumerMessage(Message message, Channel channel) throws IOException {
        try {
          /**
           * 消費者自己做冪等
           */
          messageHander.HandlerMessage(message, "user");
          /** 手動丟擲異常,測試訊息重試 */
          int i = 5 / 0;
        } catch (Exception e) {
          long retryCount = RabbitMqUtil.getRetryCount(message.getMessageProperties());
          CorrelationData correlationData =
              new CorrelationData(message.getMessageProperties().getCorrelationId());
          Message newMessage = null;
          if (retryCount >= 3) {
            /** 如果重試次數大於3,則將訊息傳送到失敗佇列等待人工處理 */
            newMessage = RabbitMqUtil.buildMessage(message);
            try {
              rabbitTemplate.convertAndSend(userServiceListenerFailExchange,
                  userSerivceRetryOrFailRoutingKey, newMessage, correlationData);
              log.info("使用者體系服務消費者消費訊息在重試3次後依然失敗,將訊息傳送到fail佇列,傳送訊息:" + new String(newMessage.getBody()));
            } catch (Exception e1) {
              log.error("使用者體系服務訊息在傳送到fail佇列的時候報錯:" + e1.getMessage() + ",原始訊息:"
                  + new String(newMessage.getBody()));
            }
          } else {
            newMessage = RabbitMqUtil.buildMessage2(message);
            try {
              /** 如果當前訊息被重試的次數小於3,則將訊息傳送到重試佇列,等待重新被消費{延遲消費} */
              rabbitTemplate.convertAndSend(userServiceListenerRetryExchange,
                  userSerivceRetryOrFailRoutingKey, newMessage, correlationData);
              log.info("使用者服務消費者消費失敗,訊息傳送到重試佇列;" + "原始訊息:" + new String(newMessage.getBody()) + ";第"
                  + (retryCount+1) + "次重試");
            } catch (Exception e1) {
              // 如果訊息在重發的時候,出現了問題,可用nack,經過開發中的實際測試,當訊息回滾到訊息佇列時,
              // 這條訊息不會回到佇列尾部,而是仍是在佇列頭部,這時消費者會立馬又接收到這條訊息,進行處理,接著丟擲異常,
              // 進行回滾,如此反覆進行。這種情況會導致訊息佇列處理出現阻塞,訊息堆積,導致正常訊息也無法執行
              // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
              // 改為重新發送訊息,經過多次重試後,如果重試次數大於3,就不會再走這,直接丟到了fail queue等待人工處理
              log.error("訊息傳送到重試佇列的時候,異常了:" + e1.getMessage() + ",重新發送訊息");
            }
          }
        } finally {
          /**
           * 關閉rabbitmq的自動ack,改為手動ack 1、因為自動ack的話,其實不管是否成功消費了,rmq都會在收到訊息後立即返給生產者ack,但是很有可能 這條訊息我並沒有成功消費
           * 2、無論消費成功還是消費失敗,都要手動進行ack,因為即使消費失敗了,也已經將訊息重新投遞到重試佇列或者失敗佇列
           * 如果不進行ack,生產者在超時後會進行訊息重發,如果消費者依然不能處理,則會存在死迴圈
           */
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
      }
    }

2.4.2 供需關係服務消費者

和使用者服務消費者一樣,每個消費者只關注本身的業務邏輯,消費異常的處理都是一樣的。

2.5 application.properties配置

    server.port=6006
    spring.application.name=springboot_rabbitmq
    #rabbitmq config
    #spring.rabbitmq.addresses=單機,叢集多個地址以,號隔開
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=tanjie
    spring.rabbitmq.password=tanjie666
    spring.rabbitmq.virtual-host=/
    #開啟rabbitmq的confirm機制,如果訊息沒有到達exchange,或者exchange在ack生產者的時候,生產者沒有收到,那麼生產者會進行重發
    #如果設定為false,經過測試,不會進行回撥
    spring.rabbitmq.publisher-confirms=true
    #開啟rabbitmq的生產端{template}重試機制,預設是false,預設重試3次
    spring.rabbitmq.template.retry.enabled=true
    #關閉訊息的強制路由,當生產者將訊息發到exchange,如果沒有queue進行繫結, 禁止broker傳送basic.return,表示當前訊息無人消費
    #因為我們配置了訊息的永續性,就算沒有消費者,訊息也在磁碟,預設就是false
    spring.rabbitmq.template.mandatory=false
    #開啟rabbitmq的消費者{listener}重試機制,該重試機制需要設定為自動ack,本次方案和PHP保持一致,如果消費者消費失敗後,手動將訊息放入死信佇列等待訊息被重新消費
    # 預設該配置為false,設定為true的意思是,如果消費者消費失敗了,rabbitmq server會自動重試3次
    #spring.rabbitmq.listener.simple.retry.enabled=true
    #消費端採用手動應答
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #預設快取模式是channel,在springboot裡面,比如在框架rabbitmqTemplate中使用的通道將會可靠地返回到快取中
    #spring.rabbitmq.cache.connection.mode=channel
    #設定預設通道快取的大小
    #spring.rabbitmq.cache.channel.size=10
    #配置生產者的配置,包括exchange,routingkey等
    java.rabbitmq.send.service.exchange=scm3.materials
    java.rabbitmq.send.service.rountkey=direct_rounting_key
    #配置supply監聽資訊
    java.rabbitmq.consumer.service.retry.exchange=scm3.materials.retry
    java.rabbitmq.consumer.service.fail.exchange=scm3.materials.fail
    [email protected]supply
    #配置user監聽資訊
    [email protected]er

2.6 工具類

    package com.yunsom.springboot.rabbitmq.util;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    public final class RabbitMqUtil {
      private RabbitMqUtil() {}
      /**
       * 獲取訊息被重試的次數
       * 
       * @param messageProperties
       * @return
       */
      @SuppressWarnings("unchecked")
      public static long getRetryCount(MessageProperties messageProperties) {
        Long retryCount = 0L;
        if (null != messageProperties) {
          Map<String, Object> headers = messageProperties.getHeaders();
          if (null != headers && !headers.isEmpty()) {
            if (headers.containsKey("x-death")) {
              List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
              if (deaths.size() > 0) {
                Map<String, Object> death = deaths.get(0);
                retryCount = (Long) death.get("count");
              }
            }
          }
        }
        return retryCount;
      }
      /**
       * 獲取原始的routingKey,這個key主要用來追蹤一開始的routing-key
       * 
       * @param properties AMQP訊息屬性
       * @param defaultValue 預設值
       * @return 原始的routing-key
       */
      private static String getOrigRoutingKey(MessageProperties messageProperties,
          String defaultValue) {
        String routingKey = defaultValue;
        if (null != messageProperties) {
          Map<String, Object> headers = messageProperties.getHeaders();
          if (null != headers && !headers.isEmpty()) {
            if (headers.containsKey("x-orig-routing-key")) {
              routingKey = headers.get("x-orig-routing-key").toString();
            }
          }
        }
        return routingKey;
      }
      private static MessageProperties createOverrideProperties(MessageProperties messageProperties,
          Map<String, Object> headers) {
        MessageProperties newMsgProperties = new MessageProperties();
        newMsgProperties.setContentType(messageProperties.getContentType());
        newMsgProperties.setContentEncoding(messageProperties.getContentEncoding());
        // 從已有的properties中建立新的properties,使用提供的headers欄位覆蓋已有的headers
        for (final Map.Entry<String, Object> mapHeaders : headers.entrySet()) {
          newMsgProperties.setHeader(mapHeaders.getKey(), mapHeaders.getValue());
        }
        newMsgProperties.setDeliveryMode(messageProperties.getDeliveryMode());
        newMsgProperties.setPriority(messageProperties.getPriority());
        newMsgProperties.setCorrelationId(messageProperties.getCorrelationId());
        newMsgProperties.setReplyTo(messageProperties.getReplyTo());
        newMsgProperties.setExpiration(messageProperties.getExpiration());
        newMsgProperties.setMessageId(messageProperties.getMessageId());
        newMsgProperties.setTimestamp(messageProperties.getTimestamp());
        newMsgProperties.setType(messageProperties.getType());
        newMsgProperties.setClusterId(messageProperties.getClusterId());
        newMsgProperties.setUserId(messageProperties.getUserId());
        newMsgProperties.setAppId(messageProperties.getAppId());
        newMsgProperties.setConsumerQueue(messageProperties.getConsumerQueue());
        newMsgProperties.setConsumerTag(messageProperties.getConsumerTag());
        return newMsgProperties;
      }
      public static Message buildMessage(Message message) {
        Map<String, Object> headers = new HashMap<>();
        return buildeLastMessage(message, headers);
      }
      public static Message buildMessage2(Message message) {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        if (null == headers || headers.isEmpty()) {
          headers = new ConcurrentHashMap<String, Object>();
        }
        return buildeLastMessage(message, headers);
      }
      private static Message buildeLastMessage(Message message, Map<String, Object> headers) {
        headers.put("x-orig-routing-key", getOrigRoutingKey(message.getMessageProperties(),
            message.getMessageProperties().getReceivedRoutingKey()));
        MessageProperties messageProperties =
            createOverrideProperties(message.getMessageProperties(), headers);
        Message newMessage = new Message(message.getBody(), messageProperties);
        return newMessage;
      }
    }

2.7 測試

啟動springboot後,通過postman傳送一個請求,其中模擬使用者消費者消費失敗,日誌如下

    2018-05-24 15:32:36.139  INFO 372672 --- [cTaskExecutor-1] c.y.s.r.consumer.UserRabbitMqConsumer    : 使用者服務消費者消費失敗,訊息傳送到重試佇列;原始訊息:Hello rabbitmq;第0次重試
    2018-05-24 15:32:36.185 DEBUG 372672 --- [nio-6006-exec-2] o.s.web.servlet.DispatcherServlet        : Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling
    2018-05-24 15:32:36.186 DEBUG 372672 --- [nio-6006-exec-2] o.s.web.servlet.DispatcherServlet        : Successfully completed request
    2018-05-24 15:33:06.187  INFO 372672 --- [cTaskExecutor-1] c.y.s.r.service.MessageHandlerImpl       : 使用者服務消費訊息:Hello rabbitmq;該訊息的id:5add0f1b-6d6f-48b7-90be-9fc3d0514e2b
    2018-05-24 15:33:06.191  INFO 372672 --- [cTaskExecutor-1] c.y.s.r.consumer.UserRabbitMqConsumer    : 使用者服務消費者消費失敗,訊息傳送到重試佇列;原始訊息:Hello rabbitmq;第1次重試
    2018-05-24 15:33:36.269  INFO 372672 --- [cTaskExecutor-1] c.y.s.r.service.MessageHandlerImpl       : 使用者服務消費訊息:Hello rabbitmq;該訊息的id:5add0f1b-6d6f-48b7-90be-9fc3d0514e2b
    2018-05-24 15:33:36.270  INFO 372672 --- [cTaskExecutor-1] c.y.s.r.consumer.UserRabbitMqConsumer    : 使用者服務消費者消費失敗,訊息傳送到重試佇列;原始訊息:Hello rabbitmq;第2次重試
    2018-05-24 15:34:06.303  INFO 372672 --- [cTaskExecutor-1] c.y.s.r.service.MessageHandlerImpl       : 使用者服務消費訊息:Hello rabbitmq;該訊息的id:5add0f1b-6d6f-48b7-90be-9fc3d0514e2b

從日誌可以看到,每隔30s重試訊息被處理一次,檢視rabbitmq的management,在失敗佇列裡面存在訊息如下,其中訊息id和日誌裡面的訊息id一樣


三、補充說明

該方案的實現,對於MQ的訊息的可靠傳輸保證,可能並不完美,後續該方案會持續維護更新。如需相關原始碼,請留言。

相關推薦

Springboot整合Rabbitmq實現消費實現可靠訊息處理

一、Rabbitmq簡介1.1 rabbitmq 架構1.2 rabbitmq相關元件介紹exchange: 交換機,主要用來將生產者傳送的訊息路由給伺服器中的佇列。routing-key: 訊息路由的key,生產者在將訊息發到到exchange的時候,需要指定routing

echarts實現雙y軸實現不同的引數使用不同的y軸

需求:折線圖實現雙y軸,並實現不同的引數使用不同的y軸 方法: 1、yAxis中新增雙軸 [{type: 'value',name: '溫度',axisLabel: {formatter: '{value}℃' } },{type: 'value',name: '百分比',

手寫實現java棧結構實現簡易的計算器(基於字尾演算法)

   一、定義   棧是一種線性表結構,棧結構中有兩端,對棧的操作都是對棧的一端進行操作的,那麼被操作的一端稱為棧頂,另一端則為棧底。對棧的操作其實就是隻有兩種,分別是入棧(也稱為壓棧)和出棧(也稱為彈棧)。入棧,將新元素壓入棧中,那麼此時這個棧元素就成為了棧頂元素,棧深度相應的+1。出棧,將棧中的

SpringBoot整合RabbitMQ實現訊息傳送和消費

下載安裝Erlang和RabbitMQ Erlang和RabbitMQ:https://www.cnblogs.com/theRhyme/p/10069611.html   專案建立和依賴 推薦SpringCloud專案線上建立:https://start.spring.io/ 不用上面這

springboot整合rabbitMq實現訊息傳送

實現思路:利用mq的ttl設定訊息失效時間 當達到設定時間後通過交換機到達死信佇列中,消費者端繫結讀取死信佇列中資訊來達到延時傳送訊息的功能。 demo 如下: (1)在pom.xml 中引入rabbitMq相關包 <dependency>

Springboot2(28)整合rabbitmq實現訊息

原始碼地址 springboot2教程系列 rabbitmq實現訊息的確認機制和延時訊息的傳送 訊息生產者程式碼實現的主要配置 @Configuration @Slf4j public class PrividerRabbitm

springboot整合rabbitmq根據查詢的資訊建立多個訊息中心和訊息佇列實現不同的訊息傳送到不同的訊息中心

      今天接到一個需求,就是在傳送訊息到rabbitmq訊息中心的時候,需要根據裝置型別,將訊息傳送到不同的訊息佇列,因此要建立不同的訊息佇列。       修改之前是把配置資訊寫在配置文中,專案啟動時,獲取配置檔案中的配置資訊,建立訊息佇列。       修改後的邏輯

springboot-rabbitmq:實現隊列

中間 edate clas 設置 重新 letter direct chang stack 延時隊列應用於什麽場景 延時隊列顧名思義,即放置在該隊列裏面的消息是不需要立即消費的,而是等待一段時間之後取出消費。那麽,為什麽需要延遲消費呢?我們來看以下的場景 網上商城下訂

Springboot+rabbitmq實現佇列的兩種方式

什麼是延時佇列,延時佇列應用於什麼場景 延時佇列顧名思義,即放置在該佇列裡面的訊息是不需要立即消費的,而是等待一段時間之後取出消費。 那麼,為什麼需要延遲消費呢?我們來看以下的場景 網上商城下訂單後30分鐘後沒有完成支付,取消訂單(如:淘寶、去哪兒網) 系統

rabbitmq實現佇列(死信佇列)

基於佇列和基於訊息的TTL TTL是time to live 的簡稱,顧名思義指的是訊息的存活時間。rabbitMq可以從兩種維度設定訊息過期時間,分別是佇列和訊息本身。 佇列訊息過期時間-Per-Queue Message TTL: 通過設定佇列的x-message-ttl引數來設定指定佇列上訊息的存活時

Java 使用RabbitMQ外掛實現佇列

Springboot專案,windows環境 環境配置 在rabbitmq 3.5.7及以上的版本提供了一個外掛(rabbitmq-delayed-message-exchange)來實現延遲佇列功能。同時外掛依賴Erlang/OPT 18.0及以上。 外掛下載地址: http

RabbitMQ 釋出訂閱-實現重試佇列(參考)

RabbitMQ訊息處理失敗,我們會讓失敗訊息進入重試佇列等待執行,因為在重試佇列距離真正執行還需要定義的時間間隔,因此,我們可以將重試佇列設定成延時處理。今天參考網上其他人的實現,簡單梳理下訊息延時重試執行的思路。 消費失敗後,自動延時將訊息重新投遞,當達到一定的重試次數後,將訊息投遞到失敗訊息佇列,等待

SpringBoot整合Mybatis自定義攔截器實現拼接sql和修改

一、應用場景 1.分頁,如com.github.pagehelper的分頁外掛實現; 2.攔截sql做日誌監控; 3.統一對某些sql進行統一條件拼接,類似於分頁。 二、MyBatis的攔截器簡介 然後我們要知道攔截器攔截什麼樣的物件,攔截物件的什麼行為,什麼時候攔截? &n

springboot整合rabbitMQ實現訊息的推送

RabbitMQ訊息中介軟體元件,目前比較流行的訊息中介軟體有:RabbitMQ、RocketMQ、ActiveMQ、Kafka等。 我的程式碼中用的是RabbitMQ,先介紹幾個概念:       一:訊息佇列的特性如下: 非同步性,將耗時的同步操作通過以傳送訊息的方

SpringBoot 整合 RabbitMQ(包含三種訊息確認機制以及消費端限流)

目錄 說明 生產端 消費端 說明 本文 SpringBoot 與 RabbitMQ 進行整合的時候,包含了三種訊息的確認模式,如果查詢詳細的確認模式設定,請閱讀:RabbitMQ的三種訊息確認

訊息中介軟體——RabbitMQ(五)快速入門生產者與消費者SpringBoot整合RabbitMQ

前言 本章我們來一次快速入門RabbitMQ——生產者與消費者。需要構建一個生產端與消費端的模型。什麼意思呢?我們的生產者傳送一條訊息,投遞到RabbitMQ叢集也就是Broker。 我們的消費端進行監聽RabbitMQ,當發現佇列中有訊息後,就進行消費。 1. 環境準備 本次整合主要採用Spring

$.each遍歷實現

延時 因此 一次 ... set () down load pre 今天做項目時需要一功能,如題..... $.each(json, function(idx, obj) { setTimeout(function () {downloadFile(obj.fil

JAVA實現過期MAP 支持自定義過期觸發事件

keys 算法 public 寫入 hash pty static 實現 ssa 如題,直接上代碼: 1 import java.util.Iterator; 2 import java.util.concurrent.ConcurrentHashMap; 3

Qt實現sleep函數功能

proc IV RoCE color event return eve 函數功能 turn /* 函數名:sleep() 參 數: msec - 單位為毫秒 描 述: 延時功能 */ bool Test::sleep(unsigned i

SpringBoot系列5】SpringBoot整合RabbitMQ

urn 項目 div fin 交換 ng- eat convert sta 前言: 因為項目需要用到RabbitMQ,前幾天就看了看RabbitMQ的知識,記錄下SpringBoot整合RabbitMQ的過程。 給出兩個網址: RabbitMQ官方教程:http://