MQ訊息佇列--RabbitMQ整合Spring理論及例項講解
今天Boss叫我去他的小黑屋分配任務,出門就記得倆詞“MQ”、“訊息佇列”。從來都沒聽說過這讓我怎麼搞?對於這種情況我慣有的方法論就是:先搞清楚它是什麼、有什麼用、有什麼工具可用、怎麼用,然後就是……擼起袖子使勁幹吧!
1、什麼是訊息佇列
訊息是指在兩個獨立的系統間傳遞的資料,這兩個系統可以是兩臺計算機,也可以是兩個程序。
訊息可以非常簡單,可以是簡單的字串,也可以是儲存了資料持久化的各種型別的文件集合。
佇列是在訊息的傳輸過程中的通道,是儲存訊息的容器,根據不同的情形,可以有先進先出,優先順序佇列等區別 。
通俗的講:
1、訊息佇列就兩個東西: 一個生產者一個消費者,生產者負責往訊息佇列裡面插入訊息,消費者負責從訊息佇列中取出訊息進行處理
2、訊息佇列就是一個容器,你把訊息丟進去,不需要立即處理。然後有個程式去從你的容器裡面把訊息一條條讀出來處理。
3、訊息佇列首先是個佇列。佇列的操作有入隊和出隊。也就是你有一個程式在產生內容然後入隊(生產者) 另一個程式讀取內容,內容出隊(消費者)
作用:降低耦合;訊息可以暫時存在在訊息佇列中,等待訊息接收者根據自身的負載處理能力控制處理訊息的處理速度,減小在大併發訪問時候的壓力。
使用場景:當你不需要立即獲得結果,但是併發量又不能無限大的時候,差不多就是你需要使用訊息佇列的時候。比如你寫日誌,因為可能一個客戶端有多個操作去寫,又有很多個客戶端,顯然併發不能無窮大,於是你就需要把寫日誌的請求放入到訊息佇列裡,在消費者那邊依次把佇列中產生的日誌寫到資料庫裡。
2、為什麼使用訊息佇列
—-訊息佇列主要的意義是解耦和非同步處理,以及在高併發場景下平滑短時間內大量的服務請求。
—-訊息佇列不僅被用於系統內部元件之間的通訊,同時也被用於系統跟其它服務之間的互動。
—-訊息佇列的使用可以增加系統的可擴充套件性、靈活性和使用者體驗。
—-基於訊息佇列可以將系統中各元件解除耦合,這樣系統就不再受最慢元件的束縛,各元件可以非同步執行從而得以更快的速度完成各自的工作。
—-非基於訊息佇列的系統,其執行速度取決於系統中最慢的元件的速度(注:短板效應)。
—-訊息佇列能夠將業務邏輯解耦,呼叫方只需要下達命令而不用等待整個邏輯執行完畢。除此之外訊息佇列也可以抑制效能波峰的產生,在瞬時業務增長產生時保持效能曲線的平滑。
3、使用訊息佇列的 10 個理由
1. 解耦
在專案啟動之初來預測將來專案會碰到什麼需求,是極其困難的。訊息佇列在處理過程中間插入了一個隱含的、基於資料的介面層,兩邊的處理過程都要實現這一介面。這允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。
2. 冗餘
有時在處理資料的時候處理過程會失敗。除非資料被持久化,否則將永遠丟失。訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險。在被許多訊息佇列所採用的”插入-獲取-刪除”正規化中,在把一個訊息從佇列中刪除之前,需要你的處理過程明確的指出該訊息已經被處理完畢,確保你的資料被安全的儲存直到你使用完畢。
3. 擴充套件性
因為訊息佇列解耦了你的處理過程,所以增大訊息入隊和處理的頻率是很容易的;只要另外增加處理過程即可。不需要改變程式碼、不需要調節引數。擴充套件就像調大電力按鈕一樣簡單。
4. 靈活性 & 峰值處理能力
當你的應用上了Hacker News的首頁,你將發現訪問流量攀升到一個不同尋常的水平。在訪問量劇增的情況下,你的應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住增長的訪問壓力,而不是因為超出負荷的請求而完全崩潰。請檢視我們關於峰值處理能力的部落格文章瞭解更多此方面的資訊。
5. 可恢復性
當體系的一部分元件失效,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。而這種允許重試或者延後處理請求的能力通常是造就一個略感不便的使用者和一個沮喪透頂的使用者之間的區別。
6. 送達保證
訊息佇列提供的冗餘機制保證了訊息能被實際的處理,只要一個程序讀取了該佇列即可。
7.排序保證
在許多情況下,資料處理的順序都很重要。訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。
8.緩衝
在任何重要的系統中,都會有需要不同的處理時間的元素。例如,載入一張圖片比應用過濾器花費更少的時間。訊息佇列通過一個緩衝層來幫助任務最高效率的執行–寫入佇列的處理會盡可能的快速,而不受從佇列讀的預備處理的約束。該緩衝有助於控制和優化資料流經過系統的速度。
9. 理解資料流
在一個分散式系統裡,要得到一個關於使用者操作會用多長時間及其原因的總體印象,是個巨大的挑戰。訊息系列通過訊息被處理的頻率,來方便的輔助確定那些表現不佳的處理過程或領域,這些地方的資料流都不夠優化。
10. 非同步通訊
很多時候,你不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許你把一個訊息放入佇列,但並不立即處理它。你想向佇列中放入多少訊息就放多少,然後在你樂意的時候再去處理它們。
4、基於RabbitMQ中介軟體的訊息佇列功能的實現
4.1、rabbitMQ的特點
- 高可用高併發,適合叢集伺服器。
- 健壯、穩定、易用、跨平臺、支援多種語言、文件齊全。
- 有訊息確認機制和持久化機制,可靠性高。
- 開源
4.2、幾個概念說明
- producer:訊息生產者。
- consumer:訊息的消費者。
Queue: 訊息佇列。提供了FIFO的處理機制,具有快取訊息的能力。rabbitmq中,佇列訊息可以設定為持久化、臨時、自動刪除。
——設定為持久化的佇列,queue中的訊息會在server本地硬碟儲存一份,防止系統crash,資料丟失。
——設定為臨時佇列,queue中的資料在系統重啟之後就會丟失。
——設定為自動刪除的佇列,當不存在使用者連線到server,佇列中的資料會被自動刪除Exchange。Exchange:類似於資料通訊網路中的交換機,提供訊息路由策略。 rabbitmq中,producer不是通過通道直接將訊息傳送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行繫結,producer在傳遞訊息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由演算法,將訊息路由給指定的queue。和Queue一樣,Exchange也可設定為持久化,臨時或者自動刪除。
Exchange有4種類型:direct(預設)、fanout、topic、headers,不同型別的Exchange轉發訊息的策略有所區別:
——Direct:直接交換器,工作方式類似於單播,Exchange會將訊息傳送完全匹配ROUTING_KEY的Queue
——fanout:廣播式交換器,不管訊息的ROUTING_KEY設定為什麼,Exchange都會將訊息轉發給所有繫結的Queue。
——topic:主題交換器,工作方式類似於組播,Exchange會將訊息轉發和ROUTING_KEY匹配模式相同的所有佇列,比如,ROUTING_KEY為user.stock的Message會轉發給繫結匹配模式為 .stock,user.stock, * . * 和#.user.stock.#的佇列。( * 表是匹配一個任意片語,#表示匹配0個或多個片語)
——headers:訊息體的header匹配(ignore)。- Binding: 所謂繫結就是將一個特定的 Exchange 和一個特定的 Queue 繫結起來。Exchange 和Queue的繫結可以是多對多的關係。
- virtual host: 在rabbitmq server上可以建立多個虛擬的message broker,又叫做virtual hosts (vhosts)。 每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。 vhost相當於物理的server,可以為不同app提供邊界隔離,使得應用安全的執行在不同的vhost例項上,相互之間不會干擾。producer和consumer連線rabbit server需要指定一個vhost。
4.3、訊息佇列的使用過程
- 客戶端連線到訊息佇列伺服器,開啟一個channel。
- 客戶端宣告一個exchange,並設定相關屬性。
- 客戶端宣告一個queue,並設定相關屬性。
- 客戶端使用routing key,在exchange和queue之間建立好繫結關係。
- 客戶端投遞訊息到exchange。
- exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡
5、訊息佇列RabbitMQ與Spring整合例項
5.1、Spring整合RabbitMQ
1、maven配置:
//pom.xml
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
2、rabbmitmq配置:
//rabbitmq-config.properties
mq.host=127.0.0.1
mq.username=test
mq.password=123456
mq.port=5672
mq.vhost=testmq
3、Spring配置:
//application-mq.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
<description>rabbitmq 連線服務配置</description>
<!-- 連線配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- spring template宣告-->
<rabbit:template exchange="amqpExchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
<!-- 訊息物件json轉換類 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
</beans>
5.2、 在Spring中使用RabbitMQ
1、申明一個訊息佇列Queue:
//application-mq.xml
<rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
說明:
durable:是否持久化
exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除
auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列
2、交換機定義:
//application-mq.xml
<rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_queue_key" key="test_queue_key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
說明:
rabbit:direct-exchange:定義exchange模式為direct,意思就是訊息與一個特定的路由鍵完全匹配,才會轉發。
rabbit:binding:設定訊息queue匹配的key
3、傳送訊息Producer:
//MQProducer.java
public interface MQProducer {
/**
* 傳送訊息到指定佇列
* @param queueKey
* @param object
*/
public void sendDataToQueue(String queueKey, Object object);
}
@Service
public class MQProducerImpl implements MQProducer {
@Autowired
private AmqpTemplate amqpTemplate;
private final static Logger LOGGER = Logger.getLogger(MQProducerImpl.class);
/* (non-Javadoc)
* @see com.stnts.tita.rm.api.mq.MQProducer#sendDataToQueue(java.lang.String, java.lang.Object)
*/
@Override
public void sendDataToQueue(String queueKey, Object object) {
try {
amqpTemplate.convertAndSend(queueKey, object);
} catch (Exception e) {
LOGGER.error(e);
}
}
}
說明:
convertAndSend:將Java物件轉換為訊息傳送到匹配Key的交換機中Exchange,由於配置了JSON轉換,這裡是將Java物件轉換成JSON字串的形式。原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
4、非同步接收訊息Consumer:
—-定義監聽器
//QueueListenter.java
@Component
public class QueueListenter implements MessageListener {
@Override
public void onMessage(Message msg) {
try{
System.out.print(msg.toString());
}catch(Exception e){
e.printStackTrace();
}
}
}
—-監聽器配置
//application-mq.xml
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="test_queue_key" ref="queueListenter"/>
</rabbit:listener-container>
說明:
queues:監聽的佇列,多個的話用逗號(,)分隔
ref:監聽器
5、JUnit測試:
//TestQueue.java
@RunWith(value = SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:/ApplicationContext/ApplicationContext-mq.xml"})
public class TestQueue{
@Autowired
MQProducer mqProducer;
final String queue_key = "test_queue_key";
@Test
public void send(){
Map<String,Object> msg = new HashMap()<>;
msg.put("data","hello,rabbmitmq!");
mqProducer.sendDataToQueue(query_key,msg);
}
}
執行測試程式,Run with JUnit,會發送一條訊息到test_queue,監聽器監聽到訊息後,打印出訊息。
至此,已經完成了spring和RabbmitMQ整合,配置,和使用。