1. 程式人生 > >springboot結合Rabbitmq例項分析

springboot結合Rabbitmq例項分析

一.前言

本文介紹springboot整合Rabbitmq的具體使用.rabbitmq採用centos的安裝方式,具體詳細安裝方法可參考前面的文章:https://blog.csdn.net/u010520146/article/details/84454004

二.相關概念

訊息佇列通常有三個概念:傳送訊息(生產者)、佇列、接收訊息(消費者)。RabbitMQ在這個基本概念之上,多做了一層抽象,在傳送訊息和佇列之間,加入了交換機。這樣傳送訊息和佇列就沒有直接關係,而是通過交換機來做轉發,交換機會根據分發策略把訊息轉給佇列。

三.開發例項

本文采用springboot的版本為1.5.9.RELEASE

1.pom.xml加入
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
2.配置檔案 application.yml中加入

其中注意埠號為5672

spring:
  rabbitmq:
    host: 192.168.153.135
    port: 5672
    username:
lss password: 888888 virtual-host: /
3.配置檔案

(1) 建立連線工廠例項,配置連線資訊

@Configuration
public class RabbitConfig {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Bean @ConfigurationProperties(prefix = "spring.rabbitmq") public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; }

(2)例項化RabbitTemplate 訊息模板

@Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必須是prototype型別
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

(3) 例項化A.B.C.D佇列,以供測試

	public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_B = "QUEUE_B";
    public static final String QUEUE_C = "QUEUE_C";
    public static final String QUEUE_D = "QUEUE_D";

  /**
     * 例項化佇列
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //佇列持久
    }

    @Bean
    public Queue queueB() {
        return new Queue(QUEUE_B, true); //佇列持久
    }

    @Bean
    public Queue queueC() {
        return new Queue(QUEUE_C, true); //佇列持久
    }
    @Bean
    public Queue queueD() {
        return new Queue(QUEUE_D, true); //佇列持久
    }

(4) 例項化4種不同交換機以供測試

FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念
HeadersExchange :通過新增屬性key-value匹配
DirectExchange:按照routingkey分發到指定佇列
TopicExchange:多關鍵字匹配

    public static final String EXCHANGE_A = "ecchange_fanout";
    public static final String EXCHANGE_B = "exchange_direct";
    public static final String EXCHANGE_C = "exchange_header";
    public static final String EXCHANGE_D = "exchange_topic";

	@Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_A);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(EXCHANGE_B);
    }

    @Bean
    public HeadersExchange headersexchange() {
        return new HeadersExchange(EXCHANGE_C);
    }

    @Bean
    public TopicExchange topicexchange() {
        return new TopicExchange(EXCHANGE_D);
    }

(5) 繫結佇列到交換機
針對DirectExchange交換機:把佇列A繫結到交換機上面

 @Bean
    public Binding bindingA() {
        return BindingBuilder.bind(queueA()).to(directExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

針對FanoutExchange交換機,將A.B.C佇列繫結到交換機A上面

@Bean
    Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueC).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeD(Queue queueD, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueD).to(fanoutExchange);
    }	

針對主題模式交換機 字首匹配到topic.即可接受

  @Bean
    Binding bindingExchangeMessage2(Queue queueD, TopicExchange exchange) {
        return BindingBuilder.bind(queueD).to(exchange).with("topic.#");
    }

針對主題模式交換機 字首匹配到topic.lss0555 即可接受

    @Bean
    Binding bindingExchangeMessage(Queue queueC, TopicExchange exchange) {
        return BindingBuilder.bind(queueC).to(exchange).with("topic.lss0555");
    }
4.例項化5個訊息接收器以供測試使用

1.QueueAReceiver_A
QueueAReceiver_A繫結的是訊息佇列QUEUE_A

@Component
public class QueueAReceiver_A {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    @RabbitListener(queues = RabbitConfig.QUEUE_A)
    public void process(String msg) {
        logger.info("接收處理佇列A訊息: " +msg);
    }
}

2.QueueBReceiver_B1
QueueAReceiver_B1繫結的是訊息佇列QUEUE_B

@Component
public class QueueBReceiver_B1 {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @RabbitHandler
    @RabbitListener(queues = RabbitConfig.QUEUE_B)
    public void process(String content) {
        logger.info("接收處理佇列B1訊息: " + content);
    }
}

3.QueueBReceiver_B2
QueueAReceiver_B2繫結的是訊息佇列QUEUE_B

@Component
public class QueueBReceiver_B2 {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @RabbitHandler
    @RabbitListener(queues = RabbitConfig.QUEUE_B)
    public void process(String content) {
        logger.info("接收處理佇列B2訊息: " + content);
    }
}

4.QueueBReceiver_C
QueueAReceiver_C繫結的是訊息佇列QUEUE_C

@Component
public class QueueBReceiver_C {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @RabbitHandler
    @RabbitListener(queues = RabbitConfig.QUEUE_C)
    public void process(String content) {
        logger.info("接收處理佇列C訊息: " + content);
    }
}

5.QueueBReceiver_D
QueueAReceiver_D繫結的是訊息佇列QUEUE_D

@Component
public class QueueBReceiver_D {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @RabbitHandler
    @RabbitListener(queues = RabbitConfig.QUEUE_D)
    public void process(String content) {
        logger.info("接收處理佇列D訊息: " + content);
    }
}

5.不同模式測試

一 .單傳送單接收

如下圖所示:P代表生產者,C代表消費者,紅色程式碼訊息佇列。P將訊息傳送到訊息佇列,C對訊息進行處理
在這裡插入圖片描述
1.建立一個生產者
傳送訊息到訊息佇列A,相應的佇列接收器是QueueAReceiver_A

 @Autowired
 private RabbitTemplate rabbitTemplate;
 public void sendMsg1(String content) {
   rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_A,content);
}

2.建立一個測試用例

 @Autowired
 RabbitMsgProduct msgProducer;
 
 @GetMapping("/sendMsg1")
    public String sendMsg1(String msg){
        msgProducer.sendMsg1(msg);
        return "success";
    }

訪問 http://localhost:8085/sendMsg1?msg=hello
結果如下,符合預期

INFO  c.e.s.r.MsgReceive.QueueAReceiver_A - 接收處理佇列A訊息: hello 
二 .工作模式(競爭)

競爭消費者如下圖:一個生產者,一個佇列,多個消費者。
同樣是點對點模式,但是在消費者之間,對消費佇列是有一些規則策略的,如:公平分發策略,輪詢分發策略等等。
在這裡插入圖片描述
1.建立訊息生產者
繫結到訊息佇列B,相應的訊息佇列接收器有QueueBReceiver_B1,QueueBReceiver_B2

public void sendMsg2(String content) {
     rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_B,content);
}

2.新建一個測試用例

 @GetMapping("/sendMsg2")
    public String sendMsg2(String msg){
      msgProducer.sendMsg2(msg);
      return "success";
 }

多次訪問: http://localhost:8085/sendMsg2?msg=hello
結果如下,符合預期

INFO  c.e.s.r.M.QueueBReceiver_B2 - 接收處理佇列B2訊息: hello 
INFO  c.e.s.r.M.QueueBReceiver_B1 - 接收處理佇列B1訊息: hello 
 INFO  c.e.s.r.M.QueueBReceiver_B2 - 接收處理佇列B2訊息: hello
三.釋出訂閱模式

如下圖:生產者將訊息不是直接傳送到佇列,而是傳送到X交換機,然後由交換機發送給兩個佇列,兩個消費者各自監聽一個佇列,來消費訊息。
這種方式實現同一個訊息被多個消費者消費。工作模式是同一個訊息只能有一個消費者。
在這裡插入圖片描述
1.新建一個訊息生產者
首先建立三個佇列QUEUE_A,QUEUE_B,QUEUE_C
然後建立交換機 fanoutExchange ,再將三個佇列繫結到該交換機上,這幾步在前面配置檔案已有說明。
接著,新建訊息生產者,將訊息傳送到交換機ecchange_fanout上

 public void sendMsg3(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
    }

2.新建測試用例

 @GetMapping("/sendMsg3")
    public String sendMsg3(String msg){
       msgProducer.sendMsg3(msg);
       return "success";
   }

訪問: http://localhost:8085/sendMsg3?msg=hello
結果如下,符合預期

INFO  c.e.s.r.M.QueueBReceiver_B2 - 接收處理佇列B2訊息: hello 
INFO  c.e.s.r.MsgReceive.QueueBReceiver_C - 接收處理佇列C訊息: hello 
INFO  c.e.s.r.MsgReceive.QueueBReceiver_D - 接收處理佇列D訊息: hello 

如果繫結的是DirectExchange型別交換機,該交換機繫結的是訊息佇列QUEUE_A,則新建訊息生產者

 public void sendMsg4(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_B, RabbitConfig.ROUTINGKEY_A, content, correlationId);
    }

測試,訪問: http://localhost:8085/sendMsg4?msg=hello
結果如下,只有接收器QueueAReceiver_A收到訊息,符合預期

INFO  c.e.s.r.MsgReceive.QueueAReceiver_A - 接收處理佇列A訊息: hello 
四.主題模式

如下圖所示:傳送端不只按固定的routing key傳送訊息,而是按字串匹配發送,接收端同樣如此,符號#匹配一個或多個詞,符號*匹配不多不少一個詞。
在這裡插入圖片描述
1.新建訊息生產者
首先建立TopicExchange型別交換機,即

	@Bean
    public TopicExchange topicexchange() {
        return new TopicExchange(EXCHANGE_D);
    }

然後建立不同匹配模式繫結到訊息佇列

  //針對主題模式交換機,繫結到訊息佇列C  字首匹配到topic.lss0555 即可接受
    @Bean
    Binding bindingExchangeMessage(Queue queueC, TopicExchange exchange) {
        return BindingBuilder.bind(queueC).to(exchange).with("topic.lss0555");
    }
    //針對主題模式交換機,繫結到訊息佇列D  字首匹配到topic. 即可接受
    @Bean
    Binding bindingExchangeMessage2(Queue queueD, TopicExchange exchange) {
        return BindingBuilder.bind(queueD).to(exchange).with("topic.#");
    }

新建3個訊息生產者,傳送訊息到到交換機上,匹配關鍵字,以供測試使用

匹配關鍵字 topic.12345

  public void sendMsg6_1(String content) {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_D, "topic.12345", content);
    }

匹配關鍵字 topic.lss0555

   public void sendMsg6_2(String content) {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_D, "topic.lss0555", content);
    }

匹配關鍵字 topic.lss05556666

  public void sendMsg6_3(String content) {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_D, "topic.lss05556666", content);
    }

訪問方法 sendMsg6_1
結果如下,符合預期

INFO  c.e.s.r.MsgReceive.QueueBReceiver_D - 接收處理佇列D訊息: hello 

訪問方法 sendMsg6_2
結果如下,符合預期

INFO  c.e.s.r.MsgReceive.QueueBReceiver_C - 接收處理佇列C訊息: hello 
INFO  c.e.s.r.MsgReceive.QueueBReceiver_D - 接收處理佇列D訊息: hello 

訪問方法 sendMsg6_3
結果如下,符合預期

INFO  c.e.s.r.MsgReceive.QueueBReceiver_D - 接收處理佇列D訊息: hello