1. 程式人生 > >Springboot整合一之Springboot整合RabbitMQ

Springboot整合一之Springboot整合RabbitMQ

1.首先我們簡單瞭解一下訊息中介軟體的應用場景

非同步處理 場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊,傳統的做法有兩種1.序列的方式;2.並行的方式  (1)序列方式:將註冊資訊寫入資料庫後,傳送註冊郵件,再發送註冊簡訊,以上三個任務全部完成後才返回給客戶端。 這有一個問題是,郵件,簡訊並不是必須的,它只是一個通知,而這種做法讓客戶端等待沒有必要等待的東西. 

(2)並行方式:將註冊資訊寫入資料庫後,傳送郵件的同時,傳送簡訊,以上三個任務完成後,返回給客戶端,並行的方式能提高處理的時間。 

假設三個業務節點分別使用50ms,序列方式使用時間150ms,並行使用時間100ms。雖然並性已經提高的處理時間,但是,前面說過,郵件和簡訊對我正常的使用網站沒有任何影響,客戶端沒有必要等著其傳送完成才顯示註冊成功,英愛是寫入資料庫後就返回.  (3)訊息佇列  引入訊息佇列後,把傳送郵件,簡訊不是必須的業務邏輯非同步處理 

由此可以看出,引入訊息佇列後,使用者的響應時間就等於寫入資料庫的時間+寫入訊息佇列的時間(可以忽略不計),引入訊息佇列後處理後,響應時間是序列的3倍,是並行的2倍。

 應用解耦

場景:雙11是購物狂節,使用者下單後,訂單系統需要通知庫存系統,傳統的做法就是訂單系統呼叫庫存系統的介面. 

這種做法有一個缺點:

  • 當庫存系統出現故障時,訂單就會失敗。
  • 訂單系統和庫存系統高耦合.  引入訊息佇列 

訂單系統:使用者下單後,訂單系統完成持久化處理,將訊息寫入訊息佇列,返回使用者訂單下單成功。

庫存系統:訂閱下單的訊息,獲取下單訊息,進行庫操作。  就算庫存系統出現故障,訊息佇列也能保證訊息的可靠投遞,不會導致訊息丟失。 流量削峰 流量削峰一般在秒殺活動中應用廣泛  場景:秒殺活動,一般會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前端加入訊息佇列。  作用:  1.可以控制活動人數,超過此一定閥值的訂單直接丟棄(我為什麼秒殺一次都沒有成功過呢^^)  2.可以緩解短時間的高流量壓垮應用(應用程式按自己的最大處理能力獲取訂單) 

1.使用者的請求,伺服器收到之後,首先寫入訊息佇列,加入訊息佇列長度超過最大值,則直接拋棄使用者請求或跳轉到錯誤頁面. 

2.秒殺業務根據訊息佇列中的請求資訊,再做後續處理.

以上內容的來源是:https://blog.csdn.net/whoamiyang/article/details/54954780,在此感謝

2.各種訊息中介軟體效能的比較:

TPS比較 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。

持久化訊息比較—zeroMq不支援,activeMq和rabbitMq都支援。持久化訊息主要是指:MQ down或者MQ所在的伺服器down了,訊息不會丟失的機制。

可靠性、靈活的路由、叢集、事務、高可用的佇列、訊息排序、問題追蹤、視覺化管理工具、外掛系統、社群—RabbitMq最好,ActiveMq次之,ZeroMq最差。

高併發—從實現語言來看,RabbitMQ最高,原因是它的實現語言是天生具備高併發高可用的erlang語言。

綜上所述:RabbitMQ的效能相對來說更好更全面,是訊息中介軟體的首選。

3.接下來我們在springboot當中整合使用RabbitMQ

第一步:匯入maven依賴  

 <!-- rabbitmq依賴 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
 	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:在application.yml檔案中進行RabbitMQ的相關配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest    # guest 3.0以後預設只能本地登陸
    password: guest
    publisher-confirms: true  #  訊息傳送到交換機確認機制,是否確認回撥
    publisher-returns: true
    virtual-host: /
    connection-timeout: 6000
server:
  port: 8080

注:使用場景:

  • 如果訊息沒有到exchange,則confirm回撥,ack=false
  • 如果訊息到達exchange,則confirm回撥,ack=true
  • exchange到queue成功,則不回撥return
  • exchange到queue失敗,則回撥return(需設定mandatory=true,否則不回回調,訊息就丟了)

第二步:進行相關配置

ExchangeConfig    訊息交換機配置

package com.space.rabbitmq.config;
 
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * 訊息交換機配置  可以配置多個
 * @author zhuzhe
 * @date 2018/5/25 15:40
 * @email [email protected]
 */
@Configuration
public class ExchangeConfig {
 
    /**
     *   1.定義direct exchange,繫結queueTest
     *   2.durable="true" rabbitmq重啟的時候不需要建立新的交換機
     *   3.direct交換器相對來說比較簡單,匹配規則為:如果路由鍵匹配,訊息就被投送到相關的佇列
     *     fanout交換器中沒有路由鍵的概念,他會把訊息傳送到所有繫結在此交換器上面的佇列中。
     *     topic交換器你採用模糊匹配路由鍵的原則進行轉發訊息到佇列中
     *   key: queue在該direct-exchange中的key值,當訊息傳送給direct-exchange中指定key為設定值時,
     *   訊息將會轉發給queue引數指定的訊息佇列
     */
    @Bean
    public DirectExchange directExchange(){
        DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE,true,false);
        return directExchange;
    }
}
QueueConfig  佇列配置
package com.space.rabbitmq.config;
 
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * 佇列配置  可以配置多個佇列
 * @author zhuzhe
 * @date 2018/5/25 13:25
 * @email [email protected]
 */
@Configuration
public class QueueConfig {
 
    @Bean
    public Queue firstQueue() {
        /**
         durable="true" 持久化 rabbitmq重啟的時候不需要建立新的佇列
         auto-delete 表示訊息佇列沒有在使用時將被自動刪除 預設是false
         exclusive  表示該訊息佇列是否只在當前connection生效,預設是false
         */
        return new Queue("first-queue",true,false,false);
    }
 
    @Bean
    public Queue secondQueue() {
        return new Queue("second-queue",true,false,false);
    }
}
RabbitMqConfig RabbitMq配置
package com.space.rabbitmq.config;

import com.space.rabbitmq.mqcallback.MsgSendConfirmCallBack;
import com.space.rabbitmq.mqcallback.MsgSendReturnCallback;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMq配置
 * @author zhuzhe
 * @date 2018/5/25 13:37
 * @email [email protected].com
 */
@Configuration
public class RabbitMqConfig {

    /** 訊息交換機的名字*/
    public static final String EXCHANGE = "exchangeTest";
    /*對列名稱*/
    public static final String QUEUE_NAME1 = "first-queue";
    public static final String QUEUE_NAME2 = "second-queue";

    /*
    *
    * key: queue在該direct-exchange中的key值,當訊息傳送給direct-exchange中指定key為設定值時,
    *   訊息將會轉發給queue引數指定的訊息佇列
    */
    /** 佇列key1*/
    public static final String ROUTINGKEY1 = "queue_one_key1";
    /** 佇列key2*/
    public static final String ROUTINGKEY2 = "queue_one_key2";

    @Autowired
    private QueueConfig queueConfig;
    @Autowired
    private ExchangeConfig exchangeConfig;

    /**
     * 連線工廠
     */
    @Autowired
    private ConnectionFactory connectionFactory;

    /**
     * 將訊息佇列1和交換機進行繫結,指定佇列key1
     */
    @Bean
    public Binding binding_one() {
        return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1);
    }

    /**
     * 將訊息佇列2和交換機進行繫結,指定佇列key2
     */
    @Bean
    public Binding binding_two() {
        return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2);
    }

    /**
     * queue listener  觀察 監聽模式
     * 當有訊息到達時會通知監聽在對應的佇列上的監聽物件
     * @return
     */
    /*@Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());
        simpleMessageListenerContainer.setExposeListenerChannel(true);
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
        simpleMessageListenerContainer.setConcurrentConsumers(1);
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
        return simpleMessageListenerContainer;
    }*/

    /**
     * 自定義rabbit template用於資料的接收和傳送
     * 可以設定訊息確認機制和回撥
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // template.setMessageConverter(); 可以自定義訊息轉換器  預設使用的JDK的,所以訊息物件需要實現Serializable

        /**若使用confirm-callback或return-callback,
         * 必須要配置publisherConfirms或publisherReturns為true
         * 每個rabbitTemplate只能有一個confirm-callback和return-callback
         */
        template.setConfirmCallback(msgSendConfirmCallBack());

        /**
         * 使用return-callback時必須設定mandatory為true,或者在配置中設定mandatory-expression的值為true,
         * 可針對每次請求的訊息去確定’mandatory’的boolean值,
         * 只能在提供’return -callback’時使用,與mandatory互斥
         */
        template.setReturnCallback(msgSendReturnCallback());
        template.setMandatory(true);
        return template;
    }

    /*  關於 msgSendConfirmCallBack 和 msgSendReturnCallback 的回撥說明:
        1.如果訊息沒有到exchange,則confirm回撥,ack=false
        2.如果訊息到達exchange,則confirm回撥,ack=true
        3.exchange到queue成功,則不回撥return
        4.exchange到queue失敗,則回撥return(需設定mandatory=true,否則不回回調,訊息就丟了)
    */

    /**
     * 訊息確認機制
     * Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些訊息被broker處理,
     * 哪些可能因為broker宕掉或者網路失敗的情況而重新發布。
     * 確認並且保證訊息被送達,提供了兩種方式:釋出確認和事務。(兩者不可同時使用)
     * 在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。
     * @return
     */
    @Bean
    public MsgSendConfirmCallBack msgSendConfirmCallBack(){
        return new MsgSendConfirmCallBack();
    }

    @Bean
    public MsgSendReturnCallback msgSendReturnCallback(){
        return new MsgSendReturnCallback();
    }
}

訊息回撥

package com.space.rabbitmq.mqcallback;
 
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
 
/**
 * 訊息傳送到交換機確認機制
 * @author zhuzhe
 * @date 2018/5/25 15:53
 * @email [email protected]
 */
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
 
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("MsgSendConfirmCallBack  , 回撥id:" + correlationData);
        if (ack) {
            System.out.println("訊息消費成功");
        } else {
            System.out.println("訊息消費失敗:" + cause+"\n重新發送");
        }
    }
}
package com.space.rabbitmq.mqcallback;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * @author zhuzhe
 * @date 2018/5/25 15:54
 * @email [email protected]
 */
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("回饋訊息:"+message);
    }
}

生產者/訊息傳送者

package com.space.rabbitmq.sender;
 
import com.space.rabbitmq.config.RabbitMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.UUID;
 
/**
 * 訊息傳送  生產者1
 * @author zhuzhe
 * @date 2018/5/25 14:28
 * @email [email protected]
 */
@Slf4j
@Component
public class FirstSender {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    /**
     * 傳送訊息
     * @param uuid
     * @param message  訊息
     */
    public void send(String uuid,Object message) {
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY2,
                message, correlationId);
    }
}

消費者

package com.space.rabbitmq.receiver;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
/**
 * 訊息消費者1
 * @author zhuzhe
 * @date 2018/5/25 17:32
 * @email [email protected]
 */
@Component
public class FirstConsumer {
 
    @RabbitListener(queues = {"first-queue","second-queue"}, containerFactory = "rabbitListenerContainerFactory")
    public void handleMessage(String message) throws Exception {
        // 處理訊息
        System.out.println("FirstConsumer {} handleMessage :"+message);
    }
}

測試

package com.space.rabbitmq.controller;
 
import com.space.rabbitmq.sender.FirstSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
import java.util.UUID;
 
/**
 * @author zhuzhe
 * @date 2018/5/25 16:00
 * @email [email protected]
 */
@RestController
public class SendController {
 
    @Autowired
    private FirstSender firstSender;
 
    @GetMapping("/send")
    public String send(String message){
        String uuid = UUID.randomUUID().toString();
        firstSender.send(uuid,message);
        return uuid;
    }
}

此時,我們就可以啟動專案,進行測試了