1. 程式人生 > >SpringBoot(八) Spring和消息隊列RabbitMQ

SpringBoot(八) Spring和消息隊列RabbitMQ

tap [] 應用 三種 jms 鏈接 安裝 tps name

概述

1.大多數應用中,可以通過消息服務中間件來提升系統異步能力和拓展解耦能力。

2.消息服務中的兩個重要概念:消息代理(Message broker)目的地(destination)

當消息發送者發送消息後,將由消息代理接管,消息代理保證消息傳遞到指定目的地。

3.消息隊列主要有兩種形式的目的地:

    1. 隊列:點對點方式通信(point-to-point)
    2. 主題:發布/訂閱消息服務

點對點式:消息發送者發送消息後,消息代理將其放入一個隊列中,消息接受者從隊列中讀取數據,接受者接收數據後,將消息移除隊列。

發布訂閱:消息發布者將消息發布到主題中,多個接受者可以訂閱主題,當消息到達時,所有的訂閱者都會接收到消息。

4.JMS(Java Message Service) Java消息服務:基於JVM消息代理的規範。

5.AMQP(Advanced Message Queuing Protocol):它是一個面向消息中間件的開放式標準應用層協議。兼容JMS,RabbitMQ是AMQP的一個實現。


JMS
AMQP
定義 Java API 網絡線級協議
跨平臺
跨語言
Model (1)、Peer-2-Peer
(2)、Pub/Sub
(1)、direct exchange
(2)、fanout exchange
(3)、topic change
(4)、headers exchange
(5)、system exchange
後四種都是pub/sub ,差別路由機制做了更詳細的劃分
支持消息類型 TextMessage
MapMessage
ByteMessage
StreamMessage
ObjectMessage
Message
byte[]通常需要序列化

RabbitMQ

Message:消息頭和消息體組成,消息體是不透明的,而消息頭上則是由一系列的可選屬性組成,屬性:路由鍵【routing-key】,優先級【priority】,指出消息可能需要持久性存儲【delivery-mode】

Publisher:消息的生產者,也是一個向交換器發布消息的客戶端應用程序

Exchange:交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列

Exchange的4中類型:direct【默認】點對點,fanout,topic和headers, 發布訂閱,不同類型的Exchange轉發消息的策略有所區別

Queue:消息隊列,用來保存消息直到發送給消費者,它是消息的容器,也是消息的終點,一個消息可投入一個或多個隊列,消息一直在隊列裏面,等待消費者連接到這個隊列將數據取走。

Binding:綁定,隊列和交換機之間的關聯,多對多關系

Connection:網絡連接,例如TCP連接

Channel:信道,多路復用連接中的一條獨立的雙向數據流通道,信道是建立在真是的TCP鏈接之內的虛擬連接AMQP命令都是通過信道發送出去的。不管是發布消息,訂閱隊列還是接受消息,都是信道,減少TCP的開銷,復用一條TCP連接。

Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端的 應用程序

VirtualHost:小型的rabbitMQ,相互隔離

Broker:表示消息隊列 服務實體

技術分享圖片

Exchange的三種方式

direct:根據路由鍵直接匹配,一對一

fanout:不經過路由鍵,直接發送到每一個隊列

topic:類似模糊匹配的根據路由鍵,來分配綁定的隊列。#匹配一個或者多個單詞,*匹配一個單詞

RabbitMQ安裝與使用

在RabbitMQ官網的下載頁面https://www.rabbitmq.com/download.html中,我們可以獲取到針對各種不同操作系統的安裝包和說明文檔。這裏,我們將對幾個常用的平臺一一說明。

下面我們采用的Erlang和RabbitMQ Server版本說明:

  • Erlang/OTP 19.1
  • RabbitMQ Server 3.6.5

Windows安裝

  1. 安裝Erland,通過官方下載頁面http://www.erlang.org/downloads獲取exe安裝包,直接打開並完成安裝。
  2. 安裝RabbitMQ,通過官方下載頁面https://www.rabbitmq.com/download.html獲取exe安裝包。
  3. 下載完成後,直接運行安裝程序。
  4. RabbitMQ Server安裝完成之後,會自動的註冊為服務,並以默認配置啟動起來。

Docker安裝

1、打開虛擬機,在docker中安裝RabbitMQ

#1.安裝rabbitmq,使用鏡像加速
docker pull registry.docker-cn.com/library/rabbitmq:3-management
[root@node1 ~]# docker images
REPOSITORY                                     TAG                 IMAGE ID            CREATED             SIZE
registry.docker-cn.com/library/rabbitmq        3-management        c51d1c73d028        11 days ago         149 MB
#2.運行rabbitmq
##### 端口:5672 客戶端和rabbitmq通信 15672:管理界面的web頁面

docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq c51d1c73d028

#3.查看運行
docker ps

2、打開網頁客戶端並登陸 網址:http://localhost:15672/,賬號【guest】,密碼【guest】,登陸。

技術分享圖片

3、添加 【direct】【faout】【topic】的綁定關系等

4、發布信息測試 。

SpringBoot整合RabbitMQ

(1)Java代碼的方式使用RabbitMQ

1.在pom.xml文件中添加依賴

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.3.7.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.在application.properties中配置關於RabbitMQ的連接和用戶信息,用戶可以回到上面的安裝內容,在管理頁面中創建用戶。

spring:
  rabbitmq:
    host: 192.168.1.125
    port: 5672
    username: guest
    password: guest

3.創建消息生產者。通過註入RabbitTemplate接口的實例來實現消息的發送,RabbitTemplate接口定義了一套針對AMQP協議的基礎操作。在Spring Boot中會根據配置來註入其具體實現。

 @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void contextLoads() {
        //Message需要自己構建一個;定義消息體內容和消息頭
        // rabbitTemplate.send(exchange, routingKey, message);
        //Object 默認當成消息體,只需要傳入要發送的對象,自動化序列發送給rabbitmq;
        Map<String,Object> map = new HashMap<>();
        map.put("msg", "這是第一個信息");
        map.put("data", Arrays.asList("helloWorld",123,true));
        //對象被默認序列以後發送出去 exchange 和 routingKey都是在瀏覽器端定義好的。
        rabbitTemplate.convertAndSend("exchange.direct","test.news",map);
    }

4.JSON序列化

@Configuration
public class MyAMQPConfig  {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

5.接收消息(取出隊列中的消息)

@Test
public void reciverAndConvert(){

    Object o = rabbitTemplate.receiveAndConvert("test.news");
    System.out.println(o.getClass());
    System.out.println(o);

}

(2)註解方式使用RabbitMQ

1.主程序開啟RabbitMQ的註解

@EnableRabbit //開啟基於註解的rabbitmq
@SpringBootApplication
public class AmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(AmqpApplication.class, args);
    }
}

2.使用註解的方式接收

@Service
public class BookService {
    @RabbitListener(queues = "test.news")
    public void receive(Book book){
        System.out.println(book);
    }

    @RabbitListener(queues = "test")
    public void receive02(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }
}

(3)創建 Exchange(交換器)、Queue(消息隊列)、Bind(綁定規則)--- AmqpAdmin。

1.創建一個Exange

@Test
public void createExchange(){
    amqpAdmin.declareExchange(new DirectExchange("amqpadmin.direct"));
    System.out.println("Create Finish");
}

2.創建Queue

@Test
public void createQueue(){

    //參數1:名字  參數2:是否持久化  
    amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
    System.out.println("Create Queue Finish");
}

3.創建Bind規則

@Test
public void createBind(){
    //參數1:目的地    參數2:類型(隊列或者交換器)   參數3:exchange的名稱    參數4:路由件       參數5:參數
    amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE , "amqpadmin.direct", "amqp.haha", null));
}

4.刪除

@Test
public void deleteExchange(){
    amqpAdmin.deleteExchange("amqpadmin.direct");
    System.out.println("delete Finish");
}

SpringBoot(八) Spring和消息隊列RabbitMQ