SpringBoot(八) Spring和訊息佇列RabbitMQ
概述
1.大多數應用中,可以通過訊息服務中介軟體來提升系統非同步能力和拓展解耦能力。
2.訊息服務中的兩個重要概念: 訊息代理(Message broker) 和 目的地(destination)
當訊息傳送者傳送訊息後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地。
3.訊息佇列主要有兩種形式的目的地:
-
- 佇列: 點對點方式通訊 (point-to-point)
- 主題: 釋出/訂閱訊息服務
點對點式:訊息傳送者傳送訊息後,訊息代理將其放入一個佇列中,訊息接受者從佇列中讀取資料,接受者接收資料後,將訊息移除佇列。
釋出訂閱:訊息釋出者將訊息釋出到主題中,多個接受者可以訂閱主題,當訊息到達時,所有的訂閱者都會接收到訊息。
4.JMS(Java Message Service) Java訊息服務:基於JVM訊息代理的規範。
5.AMQP(Advanced Message Queuing Protocol):它是一個面向訊息中介軟體的開放式標準應用層協議。相容JMS,RabbitMQ是AMQP的一個實現。
JMS | AMQP | |
---|---|---|
定義 | Java API | 網路線級協議 |
跨平臺 | 否 | 是 |
跨語言 | 否 | 是 |
Model | (1)、Peer-2-Peer (2)、Pub/Sub |
(1)、direct exchange (2)、fanout exchange (3)、topic change (4)、headers exchange (5)、system exchange 後四種都是pub/sub ,差別路由機制做了更詳細的劃分 |
支援訊息型別 | TextMessage MapMessage ByteMessage StreamMessage ObjectMessage Message |
byte[]通常需要序列化 |
RabbitMQ
Message:訊息頭和訊息體組成,訊息體是不透明的,而訊息頭上則是由一系列的可選屬性組成,屬性:路由鍵【routing-key】,優先順序【priority】,指出訊息可能需要永續性儲存【delivery-mode】
Publisher:訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式
Exchange:交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列
Exchange的4中型別:direct【預設】點對點,fanout,topic和headers, 釋出訂閱,不同型別的Exchange轉發訊息的策略有所區別
Queue:訊息佇列,用來儲存訊息直到傳送給消費者,它是訊息的容器,也是訊息的終點,一個訊息可投入一個或多個佇列,訊息一直在佇列裡面,等待消費者連線到這個佇列將資料取走。
Binding:繫結,佇列和交換機之間的關聯,多對多關係
Connection:網路連線,例如TCP連線
Channel:通道,多路複用連線中的一條獨立的雙向資料流通道,通道是建立在真是的TCP連結之內的虛擬連線AMQP命令都是通過通道傳送出去的。不管是釋出訊息,訂閱佇列還是接受訊息,都是通道,減少TCP的開銷,複用一條TCP連線。
Consumer:訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端的 應用程式
VirtualHost:小型的rabbitMQ,相互隔離
Broker:表示訊息佇列 服務實體
Exchange的三種方式
direct:根據路由鍵直接匹配,一對一
fanout:不經過路由鍵,直接傳送到每一個佇列
topic:類似模糊匹配的根據路由鍵,來分配繫結的佇列。#匹配一個或者多個單詞,*匹配一個單詞
RabbitMQ安裝與使用
在RabbitMQ官網的下載頁面 https://www.rabbitmq.com/download.html
中,我們可以獲取到針對各種不同作業系統的安裝包和說明文件。這裡,我們將對幾個常用的平臺一一說明。
下面我們採用的Erlang和RabbitMQ Server版本說明:
- Erlang/OTP 19.1
- RabbitMQ Server 3.6.5
Windows安裝
http://www.erlang.org/downloads https://www.rabbitmq.com/download.html
Docker安裝
1、開啟虛擬機器,在docker中安裝RabbitMQ
#1.安裝rabbitmq,使用映象加速 docker pull registry.docker-cn.com/library/rabbitmq:3-management [root@node1 ~]# docker images REPOSITORYTAGIMAGE IDCREATEDSIZE registry.docker-cn.com/library/rabbitmq3-managementc51d1c73d02811 days ago149 MB #2.執行rabbitmq ##### 埠:5672 客戶端和rabbitmq通訊 15672:管理介面的web頁面 docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq c51d1c73d028 #3.檢視執行 docker ps
2、開啟網頁客戶端並登陸 網址:http://localhost:15672/,賬號【guest】,密碼【guest】,登陸。
3、新增 【direct】【faout】【topic】的繫結關係等
4、釋出資訊測試 。
SpringBoot整合RabbitMQ
(1)Java程式碼的方式使用RabbitMQ
1.在pom.xml檔案中新增依賴
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2.在 application.properties
中配置關於RabbitMQ的連線和使用者資訊,使用者可以回到上面的安裝內容,在管理頁面中建立使用者。
spring: rabbitmq: host: 192.168.1.125 port: 5672 username: guest password: guest
3.建立訊息生產者。通過注入Rabbit Template
介面的例項來實現訊息的傳送,Rabbit Template
介面定義了一套針對AMQP協議的基礎操作。在Spring Boot中會根據配置來注入其具體實現。
@Autowired RabbitTemplate rabbitTemplate; @Test public void contextLoads() { //Message需要自己構建一個;定義訊息體內容和訊息頭 // rabbitTemplate.send(exchange, routingKey, message); //Object 預設當成訊息體,只需要傳入要傳送的物件,自動化序列傳送給rabbitmq; Map<String,Object> map = new HashMap<>(); map.put("msg", "這是第一個資訊"); map.put("data", Arrays.asList("helloWorld",123,true)); //物件被預設序列以後傳送出去 exchange 和 routingKey都是在瀏覽器端定義好的。 rabbitTemplate.convertAndSend("exchange.direct","test.news",map); }
4.JSON序列化
@Configuration public class MyAMQPConfig{ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
5.接收訊息(取出佇列中的訊息)
@Test public void reciverAndConvert(){ Object o = rabbitTemplate.receiveAndConvert("test.news"); System.out.println(o.getClass()); System.out.println(o); }
(2)註解方式使用RabbitMQ
1.主程式開啟RabbitMQ的註解
@EnableRabbit //開啟基於註解的rabbitmq @SpringBootApplication public class AmqpApplication { public static void main(String[] args) { SpringApplication.run(AmqpApplication.class, args); } }
2.使用註解的方式接收
@Service public class BookService { @RabbitListener(queues = "test.news") public void receive(Book book){ System.out.println(book); } @RabbitListener(queues = "test") public void receive02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } }
(3)建立 Exchange(交換器)、Queue(訊息佇列)、Bind(繫結規則)--- AmqpAdmin。
1.建立一個Exange
@Test public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("amqpadmin.direct")); System.out.println("Create Finish"); }
2.建立Queue
@Test public void createQueue(){ //引數1:名字引數2:是否持久化 amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true)); System.out.println("Create Queue Finish"); }
3.建立Bind規則
@Test public void createBind(){ //引數1:目的地引數2:型別(佇列或者交換器)引數3:exchange的名稱引數4:路由件引數5:引數 amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE , "amqpadmin.direct", "amqp.haha", null)); }
4.刪除
@Test public void deleteExchange(){ amqpAdmin.deleteExchange("amqpadmin.direct"); System.out.println("delete Finish"); }