1. 程式人生 > >崛起於Springboot2.0.X之整合RabbitMQ企業所有場景開發(46)

崛起於Springboot2.0.X之整合RabbitMQ企業所有場景開發(46)

1、部落格涵蓋點

   1.1 入門級rabbitMQ,瞭解五種預設的五種開發方案

   1.2 使用ssm xml方式整合rabbitMq,五種模式+死信佇列方案+jdk8

   1.3 本部落格專案碼雲地址:==》springboot+RabbitMQ+所有場景

    1、fanout:釋出/訂閱模式
    2、rounting:路由模式
    3、topic:萬用字元模式
    4、延遲佇列之使用CustomExchange方案:需要安裝延遲外掛 點選==》
安裝詳情
5、延遲佇列之死信佇列

2、場景

引言:(九天部落格實時更新修改,即便你是複製到你的網站部落格,也看不到每一篇部落格的優化,不如關注我哈)
RabbitMQ 場景應用:
1、秒殺場景:高併發請求執行緒進入訊息佇列,根據先進先出原則,執行秒殺邏輯
2、延遲佇列【兩種方式 使用外掛延遲 和 死信佇列延遲】:
   2.1:使用者下訂單,但是不支付,超過30分鐘訂單自動取消
   2.2:使用者註冊成功之後,需要過一段時間比如一週後校驗使用者的使用情況,如果發現使用者活躍度較低,則傳送郵件或者簡訊來提醒使用者使用。
   2.3: 延遲重試。比如消費者從佇列裡消費訊息時失敗了,但是想要延遲一段時間後自動重試
3、非同步操作
【非同步操作比同步快】: 3.1:非同步記錄使用者操作日誌:使用者的登陸app,傳送到訊息佇列,監聽記錄使用者的登陸時間、裝置,來源ip等資訊... 3.2:非同步傳送郵件:註冊或者忘記密碼的時候,通常某某網站會提示傳送你郵箱一個連結,請點選。 3.3:非同步傳送簡訊驗證碼:使用者忘記密碼或者使用手機驗證碼登陸時,可以執行非同步,沒必要讓程式序列完成所有操作最後才能接受到驗證碼

3、pom檔案

     springboot 2.0.X的依賴大家自己加上去吧,應該也適用於 springboot2.1.X。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!--工具類-->
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>4.6.1</version>
</dependency>

4、application.properties

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.simple.concurrency=3
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.acknowledge-mode=manual

5、java配置類

    4.1 rabbitmq配置

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@EnableRabbit
public class RabbbitConfig {

    @Value("${spring.rabbitmq.host}")
    public String host;

    @Value("${spring.rabbitmq.port}")
    public int port;

    @Value("${spring.rabbitmq.username}")
    public String username;

    @Value("${spring.rabbitmq.password}")
    public String password;

    @Value("${spring.rabbitmq.virtual-host}")
    public String virtual_host;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtual_host);
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    //配置消費者監聽的容器
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//設定確認模式手工確認
        return factory;
    }

    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

    4.2 Exchange配置

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author:MuJiuTian
 * @Description:所有的exchange列表
 * @Date: Created in 下午11:04 2019/8/19
 */
@Component
@Configuration
public class ExchangeConfig {

    /**
     * 建立型別:fanout交換機
     */

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_exchange",true,false,null);
    }

    /**
     * 建立型別:direct交換機
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_exchange",true,false,null);
    }

    /**
     * 建立型別:topic交換機
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("IExchange",true,false,null);
    }

    /**
     * 建立型別:custom交換機,該交換機需要安裝delay_rabbitmq外掛才能執行
     */
    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("custom_exchange","x-delayed-message",true,false,args);
    }

    /**
     * 建立型別:headers交換機
     */
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers_exchange",true,false,null);
    }

    /**
     * 延遲:immediate交換機
     */
    @Bean
    public DirectExchange immediateExchange() {
        return new DirectExchange("immediate_exchange");
    }

    /**
     * 延遲:dlx_delay交換機
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx_delay_exchange");
    }

}

    4.3 Queue配置

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author:MuJiuTian
 * @Description: 所有的佇列統一配置
 * @Date: Created in 下午11:36 2019/8/19
 */
@Configuration
@Component
public class QueueConfig {
    /**
     * 針對fanout交換機的佇列
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout_queue_1");
    }

    /**
     * 針對fanout交換機的佇列
     */
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout_queue_2");
    }

    /**
     * 針對direct交換機的佇列
     */
    @Bean
    public Queue directQueue1() {
        return new Queue("direct_queue_1");
    }

    /**
     * 針對direct交換機的佇列
     */
    @Bean
    public Queue directQueue2() {
        return new Queue("direct_queue_2");
    }

    /**
     * 針對topic交換機的佇列
     */
    @Bean
    public Queue topicQueue1() {
        return new Queue("topic_queue_1");
    }

    /**
     * 針對topic交換機的佇列
     */
    @Bean
    public Queue topicQueue2() {
        return new Queue("topic_queue_2");
    }
    /**
     * 延遲佇列
     */
    @Bean
    public Queue delayQueue() {
        return new Queue("delay_queue");
    }

    /**
     * 死信佇列方式中的立即消費佇列
     */
    @Bean
    public Queue immediateQueue() {
        return new Queue("immediate");
    }

    /**
     * 死信佇列方式中的延遲佇列
     */
    @Bean
    public Queue dlxDelay() {
        Map<String,Object> map = new HashMap<>();
        //map.put("x-message-ttl",6000);,延遲時間,不過我們不需要在這裡配置,在service設定就好了
        // x-dead-letter-exchange 聲明瞭佇列裡的死信轉發到的DLX名稱
        map.put("x-dead-letter-exchange","immediate_exchange");
        // x-dead-letter-routing-key 聲明瞭這些死信在轉發時攜帶的 routing-key 名稱。
        map.put("x-dead-letter-routing-key","immediate_road");
        return new Queue("dlx_delay_queue",true,false,false,map);
    }
}

    4.4 exchange與queue關係繫結配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author:MuJiuTian
 * @Description: 所有的exchange與queue之間的routing key
 * @Date: Created in 下午11:39 2019/8/19
 */
@Configuration
public class BindingConfig {

    @Autowired
    ExchangeConfig exchange;

    @Autowired
    QueueConfig queue;

    @Bean
    public Binding bindFanout1() {
        return BindingBuilder.bind(queue.fanoutQueue1()).to(exchange.fanoutExchange());
    }

    @Bean
    public Binding bindFanout2() {
        return BindingBuilder.bind(queue.fanoutQueue2()).to(exchange.fanoutExchange());
    }

    @Bean
    public Binding bindDirectOrange() {
        return BindingBuilder.bind(queue.directQueue1()).to(exchange.directExchange()).with("orange");
    }

    @Bean
    public Binding bindDirectBlack() {
        return BindingBuilder.bind(queue.directQueue2()).to(exchange.directExchange()).with("black");
    }

    @Bean
    public Binding bindDirectGreen() {
        return BindingBuilder.bind(queue.directQueue2()).to(exchange.directExchange()).with("green");
    }

    @Bean
    public Binding bindTopic1(){
        Binding binding= BindingBuilder.bind(queue.topicQueue1()).to(exchange.topicExchange()).with("*.orange.*");
        return binding;
    }

    @Bean
    public Binding bindTopic2(){
        Binding binding= BindingBuilder.bind(queue.topicQueue2()).to(exchange.topicExchange()).with("*.*.rabbit");
        return binding;
    }

    @Bean
    public Binding bindTopic3(){
        Binding binding= BindingBuilder.bind(queue.topicQueue2()).to(exchange.topicExchange()).with("lazy.#");
        return binding;
    }

    @Bean
    public Binding bindCustom() {
        return BindingBuilder.bind(queue.delayQueue()).to(exchange.customExchange()).with("delay_queue_road").noargs();
    }

    @Bean
    public Binding immediate() {
        return BindingBuilder.bind(queue.immediateQueue()).to(exchange.immediateExchange()).with("immediate_road");
    }

    @Bean
    public Binding dlxDelay() {
        return BindingBuilder.bind(queue.dlxDelay()).to(exchange.dlxExchange()).with("dlx_delay_road");
    }
}

6、實體類

import java.io.Serializable;

/**
 * @Author:MuJiuTian
 * @Description:
 * @Date: Created in 下午6:01 2019/8/19
 */
public class Mail implements Serializable {
    private static final long serialVersionUID = -8140693840257585779L;
    private String mailId;
    private String country;
    private Double weight;


    public Mail() {
    }

    public Mail(String mailId, String country, double weight) {
        this.mailId = mailId;
        this.country = country;
        this.weight = weight;
    }

    public String getMailId() {
        return mailId;
    }

    public void setMailId(String mailId) {
        this.mailId = mailId;
    }

    public String getCountry() {
        return country;
    }

    public void setCountry(String country) {
        this.country = country;
    }

    public double getWeight() {
        return weight;
    }

    public void setWeight(double weight) {
        this.weight = weight;
    }

    @Override
    public String toString() {
        return "Mail [mailId=" + mailId + ", country=" + country + ", weight="
                + weight + "]";
    }
}

7、service層

public interface Producer {
    void sendMessage(String exchange, String rountingKey, Object object);

    void delayMessage(String exchange, String rountingKey, long time, Object object);

    void dlxDelayMessage(String exchange, String rountingKey, long time, Object object);

    void sendAndReceive(String exchange, String rountingKey, Object object);
}
import com.example.rabbit.service.Producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @Author:MuJiuTian
 * @Description:
 * @Date: Created in 下午9:52 2019/8/19
 */
@Service
@Transactional
public class ProducerImpl implements Producer {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 下午4:10
     * @Description:
     */
    @Override
    public void sendMessage(String exchange, String rountingKey, Object object) {
       rabbitTemplate.convertAndSend(exchange,rountingKey,object);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 下午4:41
     * @Description:
     */
    @Override
    public void delayMessage(String exchange, String rountingKey, long time, Object object) {
        rabbitTemplate.convertAndSend(exchange,rountingKey,object,message -> {
            message.getMessageProperties().setHeader("x-delay",time);
            return message;
        });
    }

    @Override
    public void dlxDelayMessage(String exchange, String rountingKey, long time, Object object) {
        rabbitTemplate.convertAndSend(exchange, rountingKey, object, message -> {
            message.getMessageProperties().setExpiration(time + "");
            return message;
        });
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 下午4:46
     * @Description:傳送與消費一步完成,前提是監聽器業務邏輯處理沒有任何異常
     */
    @Override
    public void sendAndReceive(String exchange, String rountingKey, Object object) {
        rabbitTemplate.convertSendAndReceive(exchange,rountingKey,object);
    }

}

8、controller層

import cn.hutool.core.date.DateUtil;
import com.example.rabbit.entity.Mail;
import com.example.rabbit.service.impl.ProducerImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.Random;

/**
 * @Author:MuJiuTian
 * @Description:
 * @Date: Created in 下午10:23 2019/8/19
 */
@RestController
public class RabbitController {
    @Autowired
    ProducerImpl producer;

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 上午10:59
     * @Description:使用fanout交換機模式測試rabbit,該模式沒有routingKey
     */
    @RequestMapping(value = "/fanout")
    public void fanout() {
        Mail mail = randomMail();
        producer.sendMessage("fanout_exchange",null,mail);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 上午11:00
     * @Description:使用direct交換機模式測試rabbit,支援routingKey多路由模式
     */
    @RequestMapping(value = "/direct")
    public void direct() {
        Mail mail = randomMail();
        producer.sendMessage("direct_exchange","",mail);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 上午11:00
     * @Description:使用topic交換機模式測試rabbit,支援routingKey萬用字元模式
     */
    @RequestMapping(value = "/topic")
    @ResponseBody
    public void topic() {
        Mail mail = randomMail();
        //producer.sendMessage("IExchange","lazy.mm",mail);
        producer.sendMessage("IExchange","love.orange.hate",mail);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/20 下午4:34
     * @Description:延遲佇列測試,毫秒為單位
     */
    @GetMapping(value = "/delay")
    @ResponseBody
    public void delay() {
        Mail mail  = randomMail();
        String now = DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss");
        System.out.println("延遲傳送時間:"+now+"資料:"+mail.toString());
        producer.delayMessage("custom_exchange","delay_queue_road",3000,mail);
    }

    /**
     * @Author:MuJiuTian
     * @Date:2019/8/21 上午10:17
     * @Description:延遲佇列死信佇列方式
     */
    @GetMapping(value = "/dlxDelay")
    public void dlxDelay() {
        Mail mail  = randomMail();
        String now = DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss");
        System.out.println("延遲傳送時間:"+now+"資料:"+mail.toString());
        producer.dlxDelayMessage("dlx_delay_exchange","dlx_delay_road",3000,mail);
    }

    /**
     * 隨機建立一個Mail實體物件,供介面測試
     */
    public static Mail randomMail() {
        Mail mail = new Mail();
        mail.setMailId(new Random().nextInt(100)+"");
        mail.setCountry("China");
        mail.setWeight(new Random().nextDouble());
        return mail;
    }

}

9、監聽器

import cn.hutool.core.date.DateUtil;
import com.example.rabbit.entity.Mail;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;
import java.util.Map;

/**
 * @Author:MuJiuTian
 * @Description: Message包含 @Payload Object obj和@Headers Map<String,Object> heads兩者
 * @Payload @Headers @Header(name = "amqp_deliveryTag") @RabbitListener @RabbitHandler 總共5個註解的使用
 * @Date: Created in 下午10:06 2019/8/19
 */
@Component
public class MyListener {

    @Autowired
    RabbitTemplate rabbitTemplate;


    @RabbitListener(queues = "fanout_queue_1")
    public void fanoutQueue1(Mail mail) throws IOException {
        System.out.println("fanout_queue_1佇列取出訊息"+mail.toString());
    }

    @RabbitListener(queues = "fanout_queue_2")
    public void fanoutQueue2(Mail mail) throws IOException {
        System.out.println("fanout_queue_2佇列取出訊息"+mail.toString());
    }

    @RabbitListener(queues = "direct_queue_1")
    public void directQueue1(Mail mail) {
        System.out.println("direct_queue_1佇列取出訊息"+mail.toString());
    }

    @RabbitListener(queues = "direct_queue_2")
    public void directQueue2(Mail mail) {
        System.out.println("direct_queue_2佇列取出訊息"+mail.toString());
    }

    @RabbitListener(queues = "topic_queue_1")
    public void topicQueue1(Mail mail) {
        System.out.println("從topic_queue_1取出訊息"+mail.toString());
    }

    @RabbitListener(queues = "topic_queue_2")
    public void topicQueue2(@Payload Mail mail, @Headers Map<String,Object> heads,Channel channel) throws IOException {
        System.out.println("到達監聽器,準備處理RabbitMQ業務邏輯,從topic_queue_2取出訊息=="+mail.toString());
        //第一步:業務邏輯處理,如活動秒殺

        //第二部:業務邏輯處理成功之後,消費掉訊息
        channel.basicAck(Long.valueOf(heads.get("amqp_deliveryTag").toString()),true);
    }

    @RabbitListener(queues = "delay_queue")
    public void delay(@Payload Mail mail, @Header(name = "amqp_deliveryTag") long deliveryTag,Channel channel) throws IOException {
        System.out.println("延遲佇列接受時間:"+ DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"));

        //第一步:業務邏輯處理,如下訂單內30分鐘不支付情況下,自動取消訂單,這裡就不寫了,主要體現rabbitmq的延遲功能

        //第二部:業務邏輯處理成功之後,消費掉訊息
        channel.basicAck(deliveryTag,false);
    }

    @RabbitListener(queues = "immediate")
    @RabbitHandler
    public void immediate(@Payload Mail mail) {
        System.out.println("此刻時間是:"+ DateUtil.format(new Date(), DateFormat.getDateTimeInstance())+"要處理的資料="+mail);
    }
}

10、專案啟動

    專案啟動後,開啟localhost:15672,裡面的exchange和queue會自動配置好,不過還是要檢查一下exchange和queue有沒有繫結關係好,都可以了進行測試,如下:

    10.1 topic測試:http://localhost:8080/topic

    10.2 延遲佇列,使用CustomExchange測試:http://localhost:8080/delay

    10.3 延遲佇列,方式二,使用死信佇列方式測試:http://localhost:8080/dlxDelay

喜歡我就關注我吧....嘻嘻嘻。        <