1. 程式人生 > >SpringBoot整合RabbitMq

SpringBoot整合RabbitMq

tostring 易用性 toc 分布 code prior temp 模式匹配 推薦

1,該筆記主要是記錄自己學習Springboot整合RabbitMq過程,推薦一篇學習RabbitMq非常好的博客:http://blog.720ui.com/2017/rabbitmq_action_01_helloworld/

2,RabbitMq簡介:AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

3,基本概念介紹:

技術分享圖片,

3.1.Message消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。

3.2.Publisher消息的生產者,也是一個向交換器發布消息的客戶端應用程序。

3.3.Exchange交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。

3.4.Binding綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。

3.5.Queue消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者連接到這個隊列將其取走。

3.6.Connection網絡連接,比如一個TCP連接。

3.7.Channel信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。

3.8.Consumer消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

3.9.Virtual Host虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。

3.10.Broker表示消息隊列服務器實體。

4,交換器簡介:

Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。下面只講前三種模式。

4.1.Direct模式

消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配

4.2.Topic模式

topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“*”。#匹配0個或多個單詞,*匹配一個單詞。

4.3.Fanout模式

每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。

5,SpringBoot整合RabbitMq:

5.1,添加依賴:

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

5.2,在application.yml中配置RabbitMq

spring
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

5.3,定義message

@Data
public class UserInfo implements Serializable {
    private Integer UserId;
    private String username;

    @Override
    public String toString() {
        return "UserInfo{" +
                "UserId=" + UserId +
                ", username=‘" + username + ‘\‘‘ +
                ‘}‘;
    }
}

5.4,配置消息隊列

@Configuration
public class QueueConfig {
    static final String QUEUE = "direct_queue";
    //direct模式
    @Bean
    public Queue directQueue(){
        return new Queue(QUEUE,true);
    }
}

5.5,生產者

@Component
@Slf4j
public class Sender {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(){
        UserInfo user = new UserInfo();
        user.setUserId(11);
        user.setUsername("hello word");
        log.info("send message to direct_queue,message is {}",user.toString());
        this.amqpTemplate.convertAndSend(QueueConfig.QUEUE,user);
    }
}

5.6,消費者

@Component
@Slf4j
public class Receiver {
    //在隊列上進行監聽
    @RabbitListener(queues = QueueConfig.QUEUE)
    public void receiverMessage(UserInfo userInfo){
     log.info("direct_queue get message is {}",userInfo.toString());
    }
}

5.7,測試

@RestController
public class MQControllerTest {
    @Autowired
    private Sender sender;

    @PostMapping(value = "/send/message")
    public String sendMessage(){
        sender.sendMessage();
        return "ok";

    }

}

6,其他的交換機模式

SpringBoot整合RabbitMq