訊息中介軟體之Rabbitmq
- RabbitMQ是實現AMQP(高階訊息佇列協議)的訊息中介軟體的一種,最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。RabbitMQ主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生資料時,消費者無法快速消費,那麼需要一箇中間層。儲存這個資料。
- AMQP,即Advanced Message Queuing Protocol,高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。AMQP的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。
- RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。
安裝
-
docker pull rabbitmq:3.7-management
-
docker run --name rabbitmq -p 15672:15672 -p 5672:5672 -d df80af9ca0c9
- 安裝執行成功之後訪問:
http://[ip]:15672
即可登入 -
四種類型的交換器(exchange)
- ofollow,noindex">https://baijiahao.baidu.com/s?id=1577456875919174629&wfr=spider&for=pc
direct(點對點、單播,直連)
- 直連型交換機(direct exchange)是根據訊息攜帶的路由鍵(routing key)將訊息投遞給對應佇列的
fanout(扇形,廣播模式,訂閱模式)
- 扇型交換機(funout exchange)將訊息路由給繫結到它身上的所有佇列。不同於直連交換機,路由鍵在此型別上不啟任務作用。如果N個佇列繫結到某個扇型交換機上,當有訊息傳送給此扇型交換機時,交換機會將訊息的傳送給這所有的N個佇列
- 路由鍵對這個交換機 不起作用,只要傳送給扇形交換機的訊息,那麼都會發送給和其繫結的所有佇列
topic(主題)
-
直連交換機的
routing_key
方案非常簡單,如果我們希望一條訊息傳送給多個佇列,那麼這個交換機需要繫結上非常多的routing_key
,假設每個交換機上都繫結一堆的routing_key
連線到各個佇列上。那麼訊息的管理就會異常地困難。 -
所以
RabbitMQ
提供了一種主題交換機,傳送到主題交換機上的訊息需要攜帶指定規則的routing_key
,主題交換機會根據這個規則將資料傳送到對應的(多個)佇列上。主題交換機的
routing_key
需要有一定的規則,交換機和佇列的binding_key
需要採用*.#.*.....
的格式,每個部分用.
分開,其中:-
*
表示一個 單詞-
rabbit.*
能夠匹配到rabbit.new
-
rabbit.*
不能夠匹配到rabbit.new.old
-
-
#
表示任意數量(零個或多個) 單詞 。-
rabbit.#
能夠匹配到rabbit.new
-
rabbit.#
能夠匹配到rabbit.new.old
-
- 假設有一條訊息的
routing_key
為fast.rabbit.white
,那麼帶有這樣binding_key
的幾個佇列都會接收這條訊息
-
-
header(頭,首部)
- 類似主題交換機,但是頭交換機使用多個訊息屬性來代替路由鍵建立路由規則。通過判斷訊息頭的值能否與指定的繫結相匹配來確立路由規則。
- 此交換機有個重要引數:”x-match”
- 當”x-match”為“any”時,訊息頭的任意一個值被匹配就可以滿足條件
交換機屬性
- 除交換機型別外,在宣告交換機時還可以附帶許多其他的屬性,其中最重要的幾個分別是:
Name Durability Auto-delete
Queue【佇列】
- 基本的屬性如下:
name durable exclusive autoDelete arguments
springBoot整合RabbitMQ
入門
- 新增依賴
<!-- rabbitmq啟動器 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
- 配置連線資訊
spring.rabbitmq.host=192.168.0.86## 主機地址 spring.rabbitmq.port=5672## 埠 spring.rabbitmq.username=admin## 使用者名稱 spring.rabbitmq.password=123456## 密碼 spring.rabbitmq.virtual-host=/## 虛擬主機,這裡的使用者名稱和密碼一定要對這個虛擬主機有許可權
- 配置一個Topic交換機和對應的佇列,配置類如下,會自動建立
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Topic交換機的配置類 * 1、配置完成之後,當使用到的時候會自動建立,不需要手動的建立,當然使用rabbitAdmin也是可以手動建立的 */ @Configuration//指定這是一個配置類 public class TopicConfig{ /** * 建立佇列 queue1 * @return */ @Bean public Queue queue1(){ return new Queue("queue_1",true); } /** * 建立佇列 queue2 * @return */ @Bean public Queue queue2(){ //指定名稱和持久化 return new Queue("queue_2",true); } /** * 建立topic交換機 */ @Bean public TopicExchange topic1(){ return new TopicExchange("topic_1"); } /** * 將交換機topic1和佇列queue1通過路郵鍵message_1繫結在一起 * @param topic1 交換機1 ,這裡通過名稱匹配,因為是通過@Bean自動注入的 * @param queue1 佇列1這裡通過名稱匹配,因為是通過@Bean自動注入的 * @return */ @Bean public Binding bindTopic1AndQueu1(TopicExchange topic1,Queue queue1 ){ return BindingBuilder.bind(queue1).to(topic1).with("message_1"); } /** * 將交換機topic1和佇列queue2通過路郵鍵message_2繫結在一起 * @param topic1 * @param queue1 * @return */ @Bean public Binding bindTopic1AndQueu2(TopicExchange topic1,Queue queue2 ){ return BindingBuilder.bind(queue2).to(topic1).with("message_2"); } }
- 啟動類添加註解
@EnableRabbit
@EnableRabbit//開啟rabbitmq @SpringBootApplication public class DemoApplicationextends SpringBootServletInitializer{ public static void main(String[] args){ SpringApplication.run(DemoApplication.class, args); } //繼承SpringBootServletInitializer實現war包的釋出 @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application){ return application.sources(DemoApplication.class); } }
- 傳送訊息和接受訊息
@Service public class RabbitServiceImplimplements RabbitServie{ @Resource private RabbitTemplate rabbitTemplate傳送訊息; //使用rabbitTemplate傳送訊息 @Override public void send(){ Map<String, Object> map=new HashedMap(); map.put("name", "陳加兵"); rabbitTemplate.convertAndSend("topic_1", "message_1", map); } //使用rabbitTemplate接收訊息 @Override public void get(){ Map<String, Object> map=(Map<String, Object>) rabbitTemplate.receiveAndConvert("queue_1"); System.out.println(map); } }
RabbitTemplate
- springBoot自動注入,直接使用即可
- 實體類傳送訊息之前一定需要序列化
- 用於傳送和接收訊息
方法
-
void convertAndSend(String exchange, String routingKey, final Object object)
:傳送訊息exchange routingKey object
-
Object receiveAndConvert(String queueName)
:接收指定佇列的訊息-
queueName
:訊息佇列的名字
-
RabbitAdmin
- springBoot已經為我們自動注入了AmqpAdmin,用於建立交換機、佇列、繫結
@Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } }
- 測試類如下:
@RunWith(SpringRunner.class) @SpringBootTest // springBoot測試類,可以自定義測試類,不過需要引用這兩個註解 public class RabbitMqTest{ @Resource private AmqpAdmin amqpAdmin;//自動注入即可 @Test public void test4(){ DirectExchange directExchange = new DirectExchange("test_direct"); Queue queue = new Queue("direct_1", true); // 建立一個直連的交換機 amqpAdmin.declareExchange(directExchange); // 建立一個佇列 amqpAdmin.declareQueue(queue); //建立繫結關係 amqpAdmin.declareBinding(BindingBuilder.bind(queue) .to(directExchange).with("direct_message")); } }
訊息監聽
@RabbitListener @EnableRabbit
@RabbitListener
- 訊息監聽的註解,可以監聽一個或者多個佇列,一旦佇列中有了資訊,那麼就會執行,一旦被執行就意味著這條訊息被消費了(不一定,後面會講到訊息確認機制,這裡是預設會被消費的)
/** * rabbitmq的訊息處理類 * @author 陳加兵 */ @Component//注入 public class MessageHandler{ /** * 使用@RabbitListener這個註解監聽指定的佇列,一旦這個佇列有了訊息,那麼將會執行 * @param log 訊息的內容,如果接收的訊息內容是log物件,那麼將會被反序列化,存入這個log中 * 訊息一旦被監聽到了並且被執行了,那麼這條佇列的訊息將會被刪除了 */ @RabbitListener(queues={"direct_1"}) public void received(Log log){ System.out.println("------接收到訊息----"); System.out.println("訊息內容為:"+log); } /** * 使用org.springframework.amqp.core.Message物件來接收訊息,可以顯示訊息頭一些資訊 * @param message */ @RabbitListener(queues={"direct_1"}) public void received1(Message message){ System.out.println("------接收到訊息1----"); byte[] body=message.getBody(); System.out.println(message.getMessageProperties()); } }
@RabbitHandler
@RabbitListener @RabbitListener
/** * 處理訊息的類,使用@RabbitListener監聽佇列,結合@RabbitHandler處理不同內容型別的訊息 * @author Administrator * */ @RabbitListener(queues={"direct_1"})//監聽direct_1這個佇列的訊息 @Component//注入 public class RabbitMessage{ /** * 定義處理的方法是接收內容的型別為Log型別 * @param log */ @RabbitHandler public void receivedLog(Log log){ System.out.println("接收了log物件"); System.out.println(log); } /** * 定義接收內容為User型別的訊息 * @param user */ @RabbitHandler public void receivedMap(User user){ System.out.println("接收了user物件"); System.out.println(user); } }
訊息確認(SpringBoot整合)
-
訊息確認可以分為 事務模式 (類似jdbc的操作), confirm 模式(可以使用非同步回撥模式,更加高效)
-
rabbitmq預設是自動確認的,即是一條訊息被髮送了或者被消費了,無論你生產者或者消費者有沒有傳送或者消費成功,那麼都是自動確認為已傳送或者已接收了,但是在業務中接收了一條訊息不一定就是成功消費了,如果這個業務沒有正常完成,我們希望的是能夠訊息回滾,就像是mysql的事務機制,因此此時我們就需要手動確認這條訊息被消費了,而不是自動確認
- 訊息確認可以分為事務模式(類似jdbc的操作),confirm模式,具體的可以參考 https://blog.csdn.net/u013256816/article/details/55515234
confirm模式
-
confirm不同於事務模式的地方是可以使用非同步的確認模式
-
在配置檔案中配置,如下:
# 開啟發送確認 spring.rabbitmq.publisher-confirms=true # 開啟發送失敗退回 spring.rabbitmq.publisher-returns=true # 開啟ACK,開啟之後只有手動提交才會消費訊息 spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual
傳送訊息的確認
ConfirmCallback ReturnCallback
/** * 訊息傳送的業務層 * SendMessageService : 傳送訊息的介面 * ConfirmCallback : 訊息傳送成功的回撥介面 * ReturnCallback : 訊息傳送失敗的回撥介面(找不到對應的路由或者因為各種原因訊息沒有成功投遞到rabbitmq中都會出發回調) * @author 陳加兵 * @since 2018年11月15日 下午4:45:37 */ @Service public class SendMessageServiceImplimplements SendMessageService,ConfirmCallback,ReturnCallback{ @Resource private RabbitTemplate rabbitTemplate;//注入rabbitMq的template,用於傳送和消費訊息 private Logger logger=LoggerFactory.getLogger(SendMessageServiceImpl.class);//日誌 /** * 訊息傳送失敗的回撥方法,實現ReturnCallback介面的方法 * 1、訊息沒有投遞成功,包括沒有找到對應的佇列或者路由鍵 */ @Override public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) { logger.info("返回的失敗程式碼="+replyCode+" 返回的失敗資訊="+replyText); logger.info("交換機="+exchange+" 繫結的路由鍵="+routingKey); } /** * 訊息傳送確認的回撥方法 * 如果訊息沒有到exchange,則confirm回撥,ack=false * 如果訊息到達exchange,則confirm回撥,ack=true * 判斷訊息有沒有成功傳送,只需要判斷ack的值,correlationData是傳送訊息的時候傳遞過來的值(String) */ @Override public void confirm(CorrelationData correlationData,boolean ack, String cause) { //如果ack==true,表示訊息傳送成功 if (ack) { logger.info("訊息傳送成功,下面開始處理業務。。。。。。。。。。。。。。。"); logger.info("correlationData="+correlationData); }else { logger.info("訊息傳送失敗。。。。。。。。。。。。。。。。"); logger.info("cause="+cause); } } /** * 傳送訊息的方法 */ @Override public void sendMessage(Log log)throws Exception { rabbitTemplate.setConfirmCallback(this);//設定 rabbitTemplate.setReturnCallback(this); CorrelationData data=new CorrelationData(); data.setId("success");//定義內容,在訊息傳送成功的回撥方法中可以獲取這個值 rabbitTemplate.convertAndSend("amq.direct", "message", log,data);//傳送訊息 } }
消費訊息的確認
- 開啟ack之後,預設是不會自動消費的,只有手動ack才會被消費
- 手動ack和nack使用的類是
com.rabbitmq.client.Channel
-
channel.basicAck()
:手動ack-
deliveryTag
:該訊息的index -
multiple
:是否批量,如果為true
將一次性ack所有小於deliveryTag的訊息,如果為false
,那麼將ack當前的訊息
-
-
channel.basicNack(deliveryTag, multiple, requeue)
-
deliveryTag
:該訊息的index -
multiple
:是否批量,如果為true
將一次性ack所有小於deliveryTag的訊息,如果為false
,那麼將ack當前的訊息 -
requeue
:被丟棄訊息是否重新進入佇列,如果是true將會重新進入佇列
-
-
- 例項如下
import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import com.techwells.demo.domain.Log; /** * 監聽佇列direct_1 * @author 陳加兵 * @since 2018年11月15日 下午6:55:56 */ @Component @RabbitListener(queues={"direct_1"}) public class ReceivedMessageHandler{ private Logger logger=LoggerFactory.getLogger(ReceivedMessageHandler.class); /** * 接收訊息型別為Log的訊息 * 如果不手動提交的話,預設是不會被自動確認消費的,只有手動提交了,才會被真正的消費 * @param log訊息的實體類 * @param channel * @param messagerabbitmq的Message類 * @throws IOException */ @RabbitHandler//處理訊息 public void handleMessage(Log log,Channel channel,Message message)throws IOException{ logger.info("成功接收到訊息........"+log); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手動提交ack,消費訊息 // channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true); logger.info("成功被消費。。。。。。。。。。"); } /** * 處理訊息型別為String型別的訊息 * @param str * @param channel * @param message * @throws IOException */ @RabbitHandler//處理訊息 public void handleStringMessage(String str,Channel channel,Message message)throws IOException{ logger.info("成功接收到訊息........"+str); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//nack,不消費這條訊息,一般是業務失敗才會不消費 } }