1. 程式人生 > >SpringBoot 整合 RabbitMQ(包含三種訊息確認機制以及消費端限流)

SpringBoot 整合 RabbitMQ(包含三種訊息確認機制以及消費端限流)

目錄

  • 說明
  • 生產端
  • 消費端

說明

本文 SpringBoot 與 RabbitMQ 進行整合的時候,包含了三種訊息的確認模式,如果查詢詳細的確認模式設定,請閱讀:RabbitMQ的三種訊息確認模式
同時消費端也採取了限流的措施,如果對限流細節有興趣請參照之前的文章閱讀:消費端限流

生產端

首先引入 maven 依賴

        <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.1.4.RELEASE</version>
    </dependency>

Application.properties 中進行設定,開啟 confirm 確認機制,開啟 return 確認模式,設定 mandatory屬性 為 true,當設定為 true 的時候,路由不到佇列的訊息不會被自動刪除,從而才可以被 return 訊息模式監聽到。

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

#開啟 confirm 確認機制
spring.rabbitmq.publisher-confirms=true
#開啟 return 確認機制
spring.rabbitmq.publisher-returns=true
#設定為 true 後 消費者在訊息沒有被路由到合適佇列情況下會被return監聽,而不會自動刪除
spring.rabbitmq.template.mandatory=true
        

建立佇列和交換機,此處不應該建立 ConnectionFactory 和 RabbitAdmin,應該在 application.properties 中設定使用者名稱、密碼、host、埠、虛擬主機即可。

import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {
//    @Bean
//    public ConnectionFactory connectionFactory(){
//        return new CachingConnectionFactory();
//    }
//
//    @Bean
//    public RabbitAdmin rabbitAdmin(){
//        return new RabbitAdmin(connectionFactory());
//    }
    @Bean
    public Exchange bootExchange(){
        return new TopicExchange("BOOT-EXCHANGE-1", true, false);
    }

    @Bean
    public Queue bootQueue(){
        return new Queue("boot.queue1", true);
    }
}

如果程式有特殊的設定要求,追求更靈活的設定可以參考以下方式進行編碼設定,從而不用在application.properties 指定。例如我們在測試環境和生產環境中配置的虛擬主機、密碼不同、我們可以在程式中判斷處於哪種環境,靈活切換設定。

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        if("生產環境"){
          connectionFactory.set.....
        } else {
          ......
        }
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(){
        RabbitAdmin rabbitAdmin = new RabbitAdmin();
        rabbitAdmin.setAutoStartup(true);
        return new RabbitAdmin(connectionFactory());
    }

MQSender程式碼如下,包含傳送訊息以及新增 confirm 監聽、新增 return 監聽。如果消費端要設定為手工 ACK ,那麼生產端傳送訊息的時候一定傳送 correlationData ,並且全域性唯一,用以唯一標識訊息。

import com.anqi.mq.bean.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Map;

@Component
public class MQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback= new RabbitTemplate.ConfirmCallback() {

        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("correlationData: " + correlationData);
            System.out.println("ack: " + ack);
            if(!ack){
                System.out.println("異常處理....");
            }
        }

    };

    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {

        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    //傳送訊息方法呼叫: 構建Message訊息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageProperties mp = new MessageProperties();
        //在生產環境中這裡不用Message,而是使用 fastJson 等工具將物件轉換為 json 格式傳送
        Message msg = new Message(message.toString().getBytes(),mp);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 時間戳 全域性唯一
        CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
        rabbitTemplate.convertAndSend("BOOT-EXCHANGE-1", "boot.save", msg, correlationData);
    }
    //傳送訊息方法呼叫: 構建Message訊息
    public void sendUser(User user) throws Exception {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 時間戳 全域性唯一
        CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
        rabbitTemplate.convertAndSend("BOOT-EXCHANGE-1", "boot.save", user, correlationData);
    }
}


消費端

在實際生產環境中,生產端和消費端一般都是兩個系統,我們在此也將拆分成兩個專案。

以下為消費端的 application.properties 中的配置,首先配置手工確認模式,用於 ACK 的手工處理,這樣我們可以保證訊息的可靠性送達,或者在消費端消費失敗的時候可以做到重回佇列、根據業務記錄日誌等處理。我們也可以設定消費端的監聽個數和最大個數,用於控制消費端的併發情況。我們要開啟限流,指定每次處理訊息最多隻能處理兩條訊息。

spring.rabbitmq.host=localhost
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest


#設定消費端手動 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#消費者最小數量
spring.rabbitmq.listener.simple.concurrency=1
#消費之最大數量
spring.rabbitmq.listener.simple.max-concurrency=10

#在單個請求中處理的訊息個數,他應該大於等於事務數量(unack的最大數量)
spring.rabbitmq.listener.simple.prefetch=2

我們可以使用 @RabbitListener@RabblitHandler組合來監聽佇列,當然@RabbitListener 也可以加在方法上。我們這裡是建立了兩個方法用來監聽同一個佇列,具體呼叫哪個方法是通過匹配方法的入參來決定的,自定義型別的訊息需要標註@Payload,類要實現序列化介面。

package com.anqi.mq.receiver;

import com.anqi.mq.bean.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;


@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "boot.queue1", durable = "true"),
                exchange = @Exchange(value = "BOOT-EXCHANGE-1", type = "topic", durable = "true", ignoreDeclarationExceptions = "true"),
                key = "boot.*"
        )
)
@Component
public class MQReceiver {

    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws IOException {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //手工ack
        channel.basicAck(deliveryTag,true);
        System.out.println("receive--1: " + new String(message.getBody()));
    }

   @RabbitHandler
    public void onUserMessage(@Payload User user, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ack
        channel.basicAck(deliveryTag,true);
        System.out.println("receive--11: " + user.toString());
    }
}

訊息的序列化與反序列化由內部轉換器完成,如果我們要採用其他型別的訊息轉換器,我們可以對其進行設定SimpleMessageListenerContainer

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setMessageConverter(new Jackson2JsonMessageConverter());
        // 預設採用下面的這種轉換器
        // container.setMessageConverter(new SimpleMessageConverter());
        return container;
    }

單元測試類

import com.anqi.mq.bean.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

    @Autowired
    private MQSender mqSender;

    @Test
    public void send() {
        String msg = "hello spring boot";
        try {
            for (int i = 0; i < 15; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //mqSender.send(msg + ":" + i, null);
                mqSender.sendUser(new User("anqi", 25));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

測試結果如下,我們在消費方法使用了Thread.sleep(5000)來模擬訊息的處理過程,故意的延長了訊息的處理時間,從而更好的觀察限流效果。我們可以發現Unacked一直是 2, 代表正在處理的訊息數量為 2,這與我們限流的數量一致,說明了限流的目的已經實現。

相關推薦

SpringBoot 整合 RabbitMQ包含訊息確認機制以及消費

目錄 說明 生產端 消費端 說明 本文 SpringBoot 與 RabbitMQ 進行整合的時候,包含了三種訊息的確認模式,如果查詢詳細的確認模式設定,請閱讀:RabbitMQ的三種訊息確認

SpringBoot學習筆記05——SpringBoot整合RabbitMQ

下面我們來學習一下rabbitMQ消費者配置,話不多說直接上程式碼。 1.向application.properties檔案中新增配置 #rabbitMQ的 5672 埠 spring.rabbitmq.addresses=192.168.31.199:32771 #使用者名稱密碼 spri

SpringBoot學習筆記04——SpringBoot整合RabbitMQ

首先需要搭建一個RabbitMQ的服務,我是在docker跑了一個rabbitMQ的服務, docker的命令語句  docker run --name rabbit -P -d rabbitmq:3-management 映射出來的埠號如下圖 rabbitMQ這裡我

單例模式新談包含方式

  設計模式是一套被反覆使用,多數人知曉,經過分類編目的,程式碼設計的總結,也可以說是前人的智慧結晶。學習設計模式能讓我們對一些應用場景使用相同的套路達到很好的效果,我會不定時更新一些自己對設計模式的理解的文章,從定義,實現,應用場景來說說設計模式,今天我要說的物件是單例模式一,定義  什麼是單例模式,字面理

springboot 整合 rabbitMQ

轉載自  https://www.cnblogs.com/ityouknow/p/6120544.html   RabbitMQ 即一個訊息佇列,主要是用來實現應用程式的非同步和解耦,同時也能起到訊息緩衝,訊息分發的作用。 訊息中介軟體在網際網路公司的使用中

設計模式之工廠模式包含模式

設計模式是一套被反覆使用,多數人知曉,經過分類編目的,程式碼設計的總結,也可以說是前人的智慧結晶。學習設計模式能讓我們對一些應用場景使用相同的套路達到很好的效果,我會不定時更新一些自己對設計模式的理解的文章,從定義,實現,應用場景來說說設計模式,今天我要說的物件是工廠模式一:定義什麼是工廠?工廠模式是我們最常

SpringBoot整合RabbitMQ:簡單使用

我所用的Springboot 的版本是1.5.13先來看pom檔案:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns

RabbitMQ 訊息確認機制 以及 原理解析

一、場景   當訊息的投送方把訊息投遞出去,卻不知道訊息是否投遞成功了。如果訊息投送方不管的話,勢必對系統的造成可靠性的影響。 可是如果要保證系統的可靠性,訊息投靠方,如何知道訊息是否投放成功了呢? 這個就需要訊息的確認機制,我們來看下rabbitMQ的訊息

RabbitMQ使用場景練習:訊息確認機制(十一)

訊息確認機制RabbitMQ提供了transaction、confirm兩種訊息確認機制。transaction即事務機制,手動提交和回滾;confirm機制提供了Confirmlistener和waitForConfirms兩種方式。confirm機制效率明顯會高於tran

RabbitMQ 消費、TTL、死信佇列

目錄 消費端限流 TTL 死信佇列 消費端限流 1. 為什麼要對消費端限流 假設一個場景,首先,我們 Rabbitmq 伺服器積壓了有上萬條未處理的訊息,我們隨便開啟一個消費者客戶端,會出現這

學習之路-RabbitMQSpringBoot整合RabbitMQ

一:引入RabbitMQ的相關jar包: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp

訊息中介軟體——RabbitMQ快速入門生產者與消費者,SpringBoot整合RabbitMQ

前言 本章我們來一次快速入門RabbitMQ——生產者與消費者。需要構建一個生產端與消費端的模型。什麼意思呢?我們的生產者傳送一條訊息,投遞到RabbitMQ叢集也就是Broker。 我們的消費端進行監聽RabbitMQ,當發現佇列中有訊息後,就進行消費。 1. 環境準備 本次整合主要採用Spring

SpringBoot整合RabbitMQ之典型應用場景實戰

分布 boot 自動刪除 blog jce 地址 這樣的 實施 微服務 實戰前言RabbitMQ 作為目前應用相當廣泛的消息中間件,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模塊中具有重要的作用,比如業務服務模塊解耦、異步通信、高並發限流

企業級 SpringBoot 教程 十五Springboot整合RabbitMQ

vmware builder ring boot () 清單 mil throws www 這篇文章帶你了解怎麽整合RabbitMQ服務器,並且通過它怎麽去發送和接收消息。我將構建一個springboot工程,通過RabbitTemplate去通過MessageListen

【Spring Boot】30SpringBoot整合RabbitMQ

1、安裝 1.1、Erlang: Erlang下載地址,下載後安裝即可。 1.2、RabbitMQ安裝 RabbitMQ下載地址,下載後安裝即可。 注意:Erlang的版本要與RabbitMQ版本需要匹配才行。 RabbitMQ Mini

rabbitmq訊息確認機制ack

接上一篇文章,在application配置檔案中新增如下配置: ## 訊息手動確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual 這樣就開啟了訊息手動確認,然後再消費者端程式碼中加上如下程式碼進行業務處理完後的訊息確認刪除訊息

SpringBoot整合RabbitMQ,實現訊息傳送和消費

下載安裝Erlang和RabbitMQ Erlang和RabbitMQ:https://www.cnblogs.com/theRhyme/p/10069611.html   專案建立和依賴 推薦SpringCloud專案線上建立:https://start.spring.io/ 不用上面這

SpringBoot整合RabbitMQ訊息佇列

RabbitMQ 即一個訊息佇列,主要是用來實現應用程式的非同步和解耦,同時也能起到訊息緩衝,訊息分發的作用。 訊息中介軟體在網際網路公司的使用中越來越多,剛才還看到新聞阿里將RocketMQ捐獻給了apache,當然了今天的主角還是講RabbitMQ。訊息中介軟體最主要的作用是解耦,中介軟體最

RabbitMQ ——與SpringBoot整合並實現訊息確認機制

不囉嗦直接上程式碼 目錄結構如下: pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instanc

RabbitMQ知識盤點【壹】_訊息佇列介紹及訊息路由模式

最近在看訊息佇列的書籍,把一些收穫總結一下。 首先說說什麼是訊息佇列。這裡就不說那種教科書的定義了,以我的理解,訊息佇列就是通過接收和傳送訊息,使不同的應用系統連線起來。實現了業務系統的解耦,也跨越