1. 程式人生 > >SpringBoot整合RabbitMQ之傳送接收訊息實戰

SpringBoot整合RabbitMQ之傳送接收訊息實戰

實戰前言
前幾篇文章中,我們介紹了SpringBoot整合RabbitMQ的配置以及實戰了Spring的事件驅動模型,這兩篇文章對於我們後續實戰RabbitMQ其他知識要點將起到奠基的作用的。特別是Spring的事件驅動模型,當我們全篇實戰完畢RabbitMQ並大概瞭解一下RabbitMQ相關元件的原始碼時,會發現其中的ApplicationEvent、ApplicationListener、ApplicationEventPublisher跟RabbitMQ的Message、Listener、RabbitTemplate有“異曲同工之妙”,當然啦,其中更多有關聯關係的是它們的底層原始碼,感興趣的童鞋可以研究一番!

實戰概要
從本篇文章將開始採用SpringBoot整合RabbitMQ的方式來實戰相關知識要點、企業級應用業務模組以及微服務專案一些典型的問題。
本篇文章將介紹實戰RabbitMQ在SpringBoot專案中的基本應用,即如何建立佇列、交換機、路由及其繫結以及如何傳送接收訊息!

實戰歷程
前幾篇文章我們已經實現瞭如何採用IDEA開發工具實現SpringBoot整合RabbitMQ的配置,其中有一個相當重要的配置類 RabbitmqConfig.java ,我們將在這裡建立佇列、交換機、路由及其繫結,下面我們就建立一個簡單的訊息模型吧:DirectExchange+RoutingKey 。以下為建立佇列、交換機、路由及其繫結的相關資訊。

1、首先是application.properties配置檔案中配置的資訊
 

mq.env=local
basic.info.mq.exchange.name=${mq.env}:basic:info:mq:exchange
basic.info.mq.routing.key.name=${mq.env}:basic:info:mq:routing:key
basic.info.mq.queue.name=${mq.env}:basic:info:mq:queue 

2、RabbitmqConfig建立佇列、交換機、路由及其繫結

    //TODO:基本訊息模型構建
    @Bean
    public DirectExchange basicExchange(){
        return new DirectExchange(env.getProperty("basic.info.mq.exchange.name"), true,false);
    }

    @Bean(name = "basicQueue")
    public Queue basicQueue(){
        return new Queue(env.getProperty("basic.info.mq.queue.name"), true);
    }

    @Bean
    public Binding basicBinding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("basic.info.mq.routing.key.name"));
    }

3、當我們在上面建立好佇列、交換機、路由及其繫結後,我們可以先把整個專案跑起來,然後開啟http://localhost:15672/  訪問RabbitMQ後端控制檯,點選 Queues、Exchanges 欄目,即可看到我們建立好的佇列、交換機。如下所示

4、現在可以說是萬事具備,只欠東風。建立好了佇列,自然是要來使用的,下面我就採用這條佇列來發送接收“簡單的字串資訊” 以及 “物件實體資訊”!在這裡,我們在Controller執行傳送邏輯,其中充當傳送訊息的元件是RabbitTemplate,充當訊息的元件為Message。如下所示RabbitController.java

@RestController
    public class RabbitController {
    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
    private static final String Prefix="rabbit";

    @Autowired
    private Environment env;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ObjectMapper objectMapper;


    /**
     * 傳送簡單訊息
     * @param message
     * @return
     */
    @RequestMapping(value = Prefix+"/simple/message/send",method = RequestMethod.GET)
    public BaseResponse sendSimpleMessage(@RequestParam String message){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            log.info("待發送的訊息: {} ",message);

            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
            
            Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
            rabbitTemplate.convertAndSend(msg);
        }catch (Exception e){
            log.error("傳送簡單訊息發生異常: ",e.fillInStackTrace());
        }
        return response;
    }


    /**
     * 傳送物件訊息
     * @param user
     * @return
     */
    @RequestMapping(value = Prefix+"/object/message/send",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public BaseResponse sendObjectMessage(@RequestBody User user){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            log.info("待發送的訊息: {} ",user);

            rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

            Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(user)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            rabbitTemplate.convertAndSend(msg);
        }catch (Exception e){
            log.error("傳送物件訊息發生異常: ",e.fillInStackTrace());
        }
        return response;
    }}

5、在上面可以看到我們的傳送程式碼邏輯其實並不複雜,其思路主要是來源於第一階段的介紹訊息模型中的其中一種,如下圖所示。即我們是將訊息傳送到exchange,然後由於exchange與某個routingKey繫結路由到某個佇列queue,故而當訊息到達exchange後,將自然而然的被路由到指定的queue中,等待被監聽消費。

6、下面我們需要建立一個listener用於監聽消費此佇列中的訊息。程式碼邏輯如下CommonListener.java

	@Component
	public class CommonListener {

    private static final Logger log= LoggerFactory.getLogger(CommonListener.class);

    @Autowired
    private ObjectMapper objectMapper;

    /**
     * 監聽消費訊息
     * @param message
     */
    @RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload byte[] message){
        try {
            //TODO:接收String
            String result=new String(message,"UTF-8");
            log.info("接收String訊息: {} ",result);
        }catch (Exception e){
            log.error("監聽消費訊息 發生異常: ",e.fillInStackTrace());
        }
    }}

7、緊接著,我們將整個專案跑起來,然後首先訪問 “http://127.0.0.1:9092/mq/rabbit/simple/message/send?message=簡單訊息模型2”  ,然後即可看到listener接收到改訊息,可以在控制檯列印輸出!

8、然後我們改造一下 CommonListener的監聽消費程式碼邏輯用於監聽消費物件實體的資訊,如下

/**
     * 監聽消費訊息
     * @param message
     */
    @RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload byte[] message){
        try {
            //TODO:接收物件
			User user=objectMapper.readValue(message, User.class);
			log.info("接收物件訊息: {} ",user);
        }catch (Exception e){
            log.error("監聽消費訊息 發生異常: ",e.fillInStackTrace());
        }
    }}

然後再將整個專案跑起來,在postman如下發送一個物件實體資訊

可以在listener打斷點啥的監聽期執行流程,然後即可看到listener監聽消費到了佇列中該物件實體訊息,如下

至此我們的佇列、交換機、路由的建立沒啥問題了,而且我們在控制檯觀察會發現,CommonListener.java中的@RabbitListener 註解的方法確實以 不同於 主執行緒 的非同步執行緒來執行的!正如上圖所看到的 那個就是執行緒Thread的ID。

9、上面的物件實體User的實體欄位資訊包含下面三個欄位

public class User implements Serializable{
    private Integer id;
    private String userName;
    private String name;

    public User(Integer id, String userName, String name) {
        this.id = id;
        this.userName = userName;
        this.name = name;
    }

    public User() {
    }

    public Integer getId() {
        return id;
    }

    //TODO:省略了getter/setter方法--也可以在類上面 @Data 加入 Lombok註解 - 當然啦前提是要加入Lombok依賴即可
    
    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", userName='" + userName + '\'' +
                ", name='" + name + '\'' +
                '}';
    }}

10、至此,我們的SpringBoot整合RabbitMQ實戰傳送接收訊息 已經介紹完畢!在上面我們的RabbitTemplate充當了傳送訊息的元件,這個元件在SpringBoot搭建的微服務專案中至關重要,我們還可以在其中設定其他的相關屬性,包括我們之前設定好的 “訊息回撥”、“訊息確認”等追溯的屬性!當然啦,在後面我們會發現 RabbitTemplate 傳送的訊息的方式有n多種,特別是在訊息確認方面,寫法也是有多種,在後面我們會慢慢以程式碼體現出來!但不管怎麼寫,我們還是要牢牢把握住上面最基本的寫法以及上面介紹的訊息模型圖:因為那就是傳送接收訊息的核心所在!

實戰總結
SpringBoot搭建的微服務專案目前也是越來越流行,而訊息中介軟體RabbitMQ應用也是越來越廣泛,本文我們介紹瞭如何在SpringBoot搭建的專案中利用SpringBoot提供的起步依賴、自動裝配等先天優勢來建立佇列、交換機、路由及其繫結並實現訊息的傳送監聽接收消費,其實這已經初步實現了業務服務模組之間的解耦。在下節我們將用此來實戰企業級應用、微服務專案中常見的業務模組:非同步寫使用者操作日誌;非同步傳送郵件!

實戰結語
最近博主將SpringBoot整合RabbitMQ這一系列的文章進行了精要抽取並在gitchat上進行了分享,感興趣的童鞋可以關注並加入我發起的文章交流會!掃一掃下面的二維碼即可哦


另外,相關的文章也會同步釋出在微信公眾號哦,感興趣的童鞋也可以關注關注。有相關問題可以加我QQ:1974544863進行交流或者加群進行討論:java開源技術交流-583522159

相關推薦

SpringBoot整合RabbitMQ傳送接收訊息實戰

實戰前言 前幾篇文章中,我們介紹了SpringBoot整合RabbitMQ的配置以及實戰了Spring的事件驅動模型,這兩篇文章對於我們後續實戰RabbitMQ其他知識要點將起到奠基的作用的。特別是Spring的事件驅動模型,當我們全篇實戰完畢RabbitMQ並大概瞭解一

SpringBoot整合RabbitMQ典型應用場景實戰

factor aid 分享圖片 actor esp rem 排隊 stc tps 實戰前言RabbitMQ 作為目前應用相當廣泛的消息中間件,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模塊中具有重要的作用,比如業務服務模塊解耦、異步通信、

SpringBoot整合RabbitMQ典型應用場景實戰

分布 boot 自動刪除 blog jce 地址 這樣的 實施 微服務 實戰前言RabbitMQ 作為目前應用相當廣泛的消息中間件,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模塊中具有重要的作用,比如業務服務模塊解耦、異步通信、高並發限流

SpringBoot整合RabbitMQ 典型應用場景實戰

實戰前言 RabbitMQ 作為目前應用相當廣泛的訊息中介軟體,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模組中具有重要的作用,比如業務服務模組解耦、非同步通訊、高併發限流、超時業務、資料延遲處理等。上一篇博文我分享了RabbitMQ在業務服務模組解耦,非

SpringBoot整合RabbitMQ 典型應用場景實戰

實戰前言 RabbitMQ 作為目前應用相當廣泛的訊息中介軟體,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模組中具有重要的作用,比如業務服務模組解耦、非同步通訊、高併發限流、超時業務、資料延遲處理等。 RabbitMQ 官網拜讀 首先,讓我們先拜讀

SpringBoot整合RabbitMQ典型應用場景實戰

實戰前言 RabbitMQ 作為目前應用相當廣泛的訊息中介軟體,在企業級應用、微服務應用中充當著重要的角色。特別是在一些典型的應用場景以及業務模組中具有重要的作用,比如業務服務模組解耦、非同步通訊、高併發限流、超時業務、資料延遲處理等。 RabbitMQ 官網拜讀 首先

SpringBoot整合RabbitMQ發送接收消息實戰

container 會同 prope spring 註解 流行 pin public lin 實戰前言 前幾篇文章中,我們介紹了SpringBoot整合RabbitMQ的配置以及實戰了Spring的事件驅動模型,這兩篇文章對於我們後續實戰RabbitMQ其他知識要點將起到奠

SpringBoot整合RabbitMQSpring事件驅動模型

實戰背景:在進入RabbitMQ各大技術知識點之前,我們先來談談跟事件驅動息息相關的ApplicationEvent、ApplicationListener以及ApplicationEventPublisher這三大元件,點選進去看其原始碼可以發現裡面使用的CachingConnectionFa

SpringBoot整合RabbitMQ整合配置篇

實戰背景:RabbitMQ實戰第一階段-RabbitMQ的官網拜讀已經結束了,相信諸位童鞋或多或少都能入了個門,如果還是覺得迷迷糊糊似懂非懂的,那我建議諸位可以親自去拜讀拜讀官網的技術手冊或者看多幾篇我的視訊跟原始碼!因為接下來我們將進入第二階段,即應用實戰階段。其中,第一階段的內容因為屬於入門

SpringBoot整合RabbitMQ基礎例項2

此文承接SpringBoot整合RabbitMQ之基礎例項1的所有配置 配置交換機,佇列及繫結關係 package com.etoak.crazy.config.rabbitmq; import org.springframework.amqp.core.Binding; im

SpringBoot整合RabbitMQ基礎例項1

在pom檔案中匯入依賴 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</

SpringBoot整合RabbitMQDirect Exchange交換機(學習總結)

一、前言 在總結Spring Cloud Bus訊息匯流排的時候,需要用到RabbitMQ訊息中介軟體,由於之前對MQ還不是很熟悉,所以花了一點時間研究了一下RabbitMQ。 二、簡介 RabbitMQ 是一個訊息中介軟體,以非同步的方式處理訊息,實現了與業務之間的解

SpringBoot整合RabbitMQTopic Exchange萬用字元交換機(學習總結)

一、簡介 Topic Exchange交換機也叫萬用字元交換機,我們在傳送訊息到Topic Exchange的時候不能隨意指定route key(應該是由一系列點號連線的字串,一般會與binding key有關聯,route key的長度一般不能超過255個位元組)。同理,

使用RabbitMQ簡單傳送接收訊息

舉個例子,假如你想投遞一封郵件,你可以將郵件投遞到某個郵箱,然後郵遞員從郵箱中獲取郵件,並將郵件交付到接收方,在這個過程中,RabbitMQ就 類似於郵箱 、郵局和郵遞員,RabbitMQ是一個訊息佇列,它可以接收程式傳送的訊息,然後放入到相應的訊息佇列中,另外一些程式可以從

ROS速成傳送&接收訊息

人真的老了,扔了個週末,完全不記得幹了什麼...論紀錄的重要性啊,當時覺得明白的很,你扔兩天試試?扔一年試試?扔幾年試試? 最近參加的各種專案腦疼眼乏,一天要同時推進三個專案,且都是技術一線,各專案之間相關性不強,往往是第一個專案上午幹了記錄下,下午又投入到第二個專案中去,

springboot整合rabbitMQ物件傳輸

rabbitMQ的安裝方法網上有很多教程,這裡就不重複了。 在springboot上使用rabbitMQ傳輸字串和物件,本文所給出的例子是在兩個不同的專案之間進行物件和和字串的傳輸。 rabbitMQ的依賴(在兩個專案中一樣的配置):

SpringBoot整合RabbitMQ,實現訊息傳送和消費

下載安裝Erlang和RabbitMQ Erlang和RabbitMQ:https://www.cnblogs.com/theRhyme/p/10069611.html   專案建立和依賴 推薦SpringCloud專案線上建立:https://start.spring.io/ 不用上面這

(九) RabbitMQ實戰教程(面向Java開發人員)SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ 使用SpringBoot整合RabbitMQ非常簡單,它極大程度的簡化了開發成本,使用SpringBoot整合RabbitMQ需匯入如下依賴 <parent> <groupId>o

springboot整合rabbitmq,根據查詢的資訊建立多個訊息中心和訊息佇列,並實現不同的訊息傳送到不同的訊息中心

      今天接到一個需求,就是在傳送訊息到rabbitmq訊息中心的時候,需要根據裝置型別,將訊息傳送到不同的訊息佇列,因此要建立不同的訊息佇列。       修改之前是把配置資訊寫在配置文中,專案啟動時,獲取配置檔案中的配置資訊,建立訊息佇列。       修改後的邏輯

springboot整合rabbitMq實現訊息延時傳送

實現思路:利用mq的ttl設定訊息失效時間 當達到設定時間後通過交換機到達死信佇列中,消費者端繫結讀取死信佇列中資訊來達到延時傳送訊息的功能。 demo 如下: (1)在pom.xml 中引入rabbitMq相關包 <dependency>