過期時間TTL
過期時間TTL表示可以對訊息設定預期的時間,在這個時間內都可以被消費者接收穫取;
過了之後訊息將自動被刪除。
RabbitMQ可以對訊息和佇列設定TTL。目前有兩種方法可以設定。
- 第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。
- 第二種方法是對訊息進行單獨設定,每條訊息TTL可以不同。
如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。
訊息在佇列的生存時間一旦超過設定的TTL值,就稱為dead message被投遞到死信佇列, 消費者將無法再收到該訊息。
設定佇列TTL
//設定相關的配置,也可以在Web介面中設定
package com.zwt.springbootfanout.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class TTLRabbitMqConfiguration {
@Bean
public DirectExchange ttlDirectExchange() {
return new DirectExchange("ttl_direct_exchange", true, true);
}
@Bean
public Queue tttDireclQueue() {
HashMap<String, Integer> map = new HashMap<>();
map.put("x-message-ttl",2000);
return new Queue("ttl.direct.queue",true);
}
@Bean
public Binding ttlDirectBinding() {
return BindingBuilder.bind(tttDireclQueue()).to(ttlDirectExchange()).with("ttl");
}
}
expiration 欄位以微秒為單位表示 TTL 值。
且與 x-message-ttl 具有相同的約束條件。
因為 expiration 欄位必須為字串型別,broker 將只會接受以字串形式表達的數字。
當同時指定了 queue 和 message 的 TTL 值,則兩者中較小的那個才會起作用。
訊息確認機制的配置
NONE值是禁用釋出確認模式,是預設值
CORRELATED值是釋出訊息成功到交換器後會觸發回撥方法
SIMPLE值經測試有兩種效果
其一效果和CORRELATED值一樣會觸發回撥方法,
其二在釋出訊息成功後使用rabbitTemplate呼叫waitForConfirms或waitForConfirmsOrDie方法等待broker節點返回傳送結果,
根據返回結果來判定下一步的邏輯,
要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法傳送訊息到broker;
死信佇列
DLX,全稱為Dead-Letter-Exchange , 可以稱之為死信交換機,也有人稱之為死信郵箱。
當訊息在一個佇列中變成死信(dead message)之後,它能被重新發送到另一個交換機中,這個交換機就是DLX ,繫結DLX的佇列就稱之為死信佇列。
訊息變成死信,可能是由於以下的原因:
- 訊息被拒絕
- 訊息過期
- 佇列達到最大長度
DLX也是一個正常的交換機,和一般的交換機沒有區別,它能在任何的佇列上被指定,實際上就是設定某一個佇列的屬性。
當這個佇列中存在死信時,Rabbitmq就會自動地將這個訊息重新發布到設定的DLX上去,進而被路由到另一個佇列,即死信佇列。
要想使用死信佇列,只需要在定義佇列的時候設定佇列引數 x-dead-letter-exchange
指定交換機即可。
記憶體磁碟的監控
當記憶體使用超過配置的閾值或者磁碟空間剩餘空間對於配置的閾值時,
RabbitMQ會暫時阻塞客戶端的連線,並且停止接收從客戶端發來的訊息,以此避免伺服器的崩潰,
客戶端與服務端的心態檢測機制也會失效。
當出現blocking或blocked話說明到達了閾值和以及高負荷運行了。
RabbitMQ的記憶體控制
參考幫助文件:https://www.rabbitmq.com/configure.html
當出現警告的時候,可以通過配置去修改和調整。
命令的方式
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 為記憶體閾值。
預設情況是:0.4/2GB,代表的含義是:當RabbitMQ的記憶體超過40%時,就會產生警告並且阻塞所有生產者的連線。
通過此命令修改閾值在Broker重啟以後將會失效,通過修改配置檔案方式設定的閾值則不會隨著重啟而消失,但修改了配置檔案一樣要重啟broker才會生效。
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
配置檔案方式 rabbitmq.conf
當前配置檔案:/etc/rabbitmq/rabbitmq.conf
#預設
#vm_memory_high_watermark.relative = 0.4
# 使用relative相對值進行設定fraction,建議取值在04~0.7之間,不建議超過0.7.
vm_memory_high_watermark.relative = 0.6
# 使用absolute的絕對值的方式,但是是KB,MB,GB對應的命令如下
vm_memory_high_watermark.absolute = 2GB
RabbitMQ的記憶體換頁
在某個Broker節點及記憶體阻塞生產者之前,它會嘗試將佇列中的訊息換頁到磁碟以釋放記憶體空間,
持久化和非持久化的訊息都會寫入磁碟中,
其中持久化的訊息本身就在磁碟中有一個副本,所以在轉移的過程中持久化的訊息會先從記憶體中清除掉。
預設情況下,記憶體到達的閾值是50%時就會換頁處理。
也就是說,在預設情況下該記憶體的閾值是0.4的情況下,當記憶體超過0.4*0.5=0.2時,會進行換頁動作。
比如有1000MB記憶體,當記憶體的使用率達到了400MB,
已經達到了極限,但是因為配置的換頁記憶體0.5,這個時候會在達到極限400mb之前,會把記憶體中的200MB進行轉移到磁碟中。從而達到穩健的執行。
可以通過設定 vm_memory_high_watermark_paging_ratio
來進行調整。
RabbitMQ的磁碟預警
當磁碟的剩餘空間低於確定的閾值時,RabbitMQ同樣會阻塞生產者,
這樣可以避免因非持久化的訊息持續換頁而耗盡磁碟空間導致伺服器崩潰。
預設情況下:磁碟預警為50MB的時候會進行預警。
表示當前磁碟空間第50MB的時候會阻塞生產者並且停止記憶體訊息換頁到磁碟的過程。
通過命令方式修改如下:
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
disk_limit:固定單位 KB MB GB
fraction :是相對閾值,建議範圍在:1.0~2.0之間。(相對於記憶體)
通過配置檔案配置如下:
disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb
叢集
RabbitMQ 叢集
RabbitMQ這款訊息佇列中介軟體產品本身是基於Erlang編寫,Erlang語言天生具備分散式特性(通過同步Erlang叢集各節點的magic cookie來實現)。
因此,RabbitMQ天然支援Clustering。
這使得RabbitMQ本身不需要像ActiveMQ、Kafka那樣通過ZooKeeper分別來實現HA方案和儲存叢集的元資料。
叢集是保證可靠性的一種方式,同時可以通過水平擴充套件以達到增加訊息吞吐量能力的目的。
叢集搭建
ps aux|grep rabbitmq
systemctl status rabbitmq-server
場景:假設有兩個rabbitmq節點,分別為rabbit-1, rabbit-2,rabbit-1作為主節點,rabbit-2作為從節點。
啟動命令:RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server -detached
結束命令:rabbitmqctl -n rabbit-1 stop
第一步:啟動第一個節點rabbit-1
> sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
...............省略...................
########## Logs: /var/log/rabbitmq/rabbit-1.log
###### ## /var/log/rabbitmq/rabbit-1-sasl.log
##########
Starting broker...
completed with 7 plugins.
啟動第二個節點rabbit-2
注意:web管理外掛端口占用,所以還要指定其web外掛佔用的埠號
RABBITMQ_SERVER_START_ARGS=”-rabbitmq_management listener [{port,15673}]”
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
..............省略..................
########## Logs: /var/log/rabbitmq/rabbit-2.log
###### ## /var/log/rabbitmq/rabbit-2-sasl.log
##########
Starting broker...
completed with 7 plugins.
驗證啟動 “ps aux|grep rabbitmq”
rabbit-1操作作為主節點
#停止應用
> sudo rabbitmqctl -n rabbit-1 stop_app
#目的是清除節點上的歷史資料(如果不清除,無法將節點加入到叢集)
> sudo rabbitmqctl -n rabbit-1 reset
#啟動應用
> sudo rabbitmqctl -n rabbit-1 start_app
rabbit2操作為從節點
# 停止應用
> sudo rabbitmqctl -n rabbit-2 stop_app
# 目的是清除節點上的歷史資料(如果不清除,無法將節點加入到叢集)
> sudo rabbitmqctl -n rabbit-2 reset
# 將rabbit2節點加入到rabbit1(主節點)叢集當中【Server-node伺服器的主機名】
> sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@'Server-node'
# 啟動應用
> sudo rabbitmqctl -n rabbit-2 start_app
驗證叢集狀態
> sudo rabbitmqctl cluster_status -n rabbit-1
//叢集有兩個節點:rabbit-1@Server-node、rabbit-2@Server-node
[{nodes,[{disc,['rabbit-1@Server-node','rabbit-2@Server-node']}]},
{running_nodes,['rabbit-2@Server-node','rabbit-1@Server-node']},
{cluster_name,<<"[email protected]">>},
{partitions,[]},
{alarms,[{'rabbit-2@Server-node',[]},{'rabbit-1@Server-node',[]}]}]
注意在訪問的時候:web結面的管理需要給15672 node-1 和15673的node-2 設定使用者名稱和密碼。
rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"
Tips:
如果採用多機部署方式,需讀取其中一個節點的cookie, 並複製到其他節點(節點之間通過cookie確定相互是否可通訊)。
cookie存放在/var/lib/rabbitmq/.erlang.cookie。
例如:主機名分別為rabbit-1、rabbit-2
1、逐個啟動各節點
2、配置各節點的hosts檔案( vim /etc/hosts)
ip1:rabbit-1
ip2:rabbit-2
其它步驟雷同單機部署方式
分散式事務
分散式事務指事務的操作位於不同的節點上,需要保證事務的 AICD 特性。
例如:在下單場景下,庫存和訂單如果不在同一個節點上,就涉及分散式事務。
分散式事務的方式
在分散式系統中,要實現分散式事務,無外乎那幾種解決方案。
兩階段提交(2PC)需要資料庫產商的支援,java元件有atomikos等
兩階段提交(Two-phase Commit,2PC),通過引入協調者(Coordinator)來協調參與者的行為,並最終決定這些參與者是否要真正執行事務。
準備階段
協調者詢問參與者事務是否執行成功,參與者發回事務執行結果。
提交階段
如果事務在每個參與者上都執行成功,事務協調者傳送通知讓參與者提交事務;
否則,協調者傳送通知讓參與者回滾事務。
需要注意的是,在準備階段,參與者執行了事務,但是還未提交。只有在提交階段接收到協調者發來的通知後,才進行提交或者回滾。
存在的問題
- 同步阻塞 所有事務參與者在等待其它參與者響應的時候都處於同步阻塞狀態,無法進行其它操作。
- 單點問題 協調者在 2PC 中起到非常大的作用,發生故障將會造成很大影響。特別是在階段二發生故障,所有參與者會一直等待狀態,無法完成其它操作。
- 資料不一致 在階段二,如果協調者只發送了部分 Commit 訊息,此時網路發生異常,那麼只有部分參與者接收到 Commit 訊息,也就是說只有部分參與者提交了事務,使得系統資料不一致。
- 太過保守 任意一個節點失敗就會導致整個事務失敗,沒有完善的容錯機制。
補償事務(TCC) 嚴選,阿里,螞蟻金服
TCC 其實就是採用的補償機制,其核心思想是:針對每個操作,都要註冊一個與其對應的確認和補償(撤銷)操作。
它分為三個階段:
Try 階段主要是對業務系統做檢測及資源預留。
Confirm 階段主要是對業務系統做確認提交,Try階段執行成功並開始執行 Confirm階段時,
預設 - - - Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。
舉個例子,假入 Bob 要向 Smith 轉賬,
思路大概是: 我們有一個本地方法,裡面依次呼叫
1:首先在 Try 階段,要先呼叫遠端介面把 Smith 和 Bob 的錢給凍結起來。
2:在 Confirm 階段,執行遠端呼叫的轉賬的操作,轉賬成功進行解凍。
3:如果第2步執行成功,那麼轉賬成功,如果第二步執行失敗,則呼叫遠端凍結介面對應的解凍方法 (Cancel)。
優點: 跟2PC比起來,實現以及流程相對簡單了一些,但資料的一致性比2PC也要差一些
缺點: 缺點還是比較明顯的,在2,3步中都有可能失敗。
TCC屬於應用層的一種補償方式,所以需要程式設計師在實現的時候多寫很多補償的程式碼,在一些場景中,一些業務流程可能用TCC不太好定義及處理。
本地訊息表(非同步確保)比如:支付寶、微信支付主動查詢支付狀態,對賬單的形式
本地訊息表與業務資料表處於同一個資料庫中,這樣就能利用本地事務來保證在對這兩個表的操作滿足事務特性,並且使用了訊息佇列來保證最終一致性。
在分散式事務操作的一方完成寫業務資料的操作之後向本地訊息表傳送一個訊息,本地事務能保證這個訊息一定會被寫入本地訊息表中。
之後將本地訊息表中的訊息轉發到 Kafka 等訊息佇列中,如果轉發成功則將訊息從本地訊息表中刪除,否則繼續重新轉發。
在分散式事務操作的另一方從訊息佇列中讀取一個訊息,並執行訊息中的操作。
優點: 一種非常經典的實現,避免了分散式事務,實現了最終一致性。
缺點: 訊息表會耦合到業務系統中,如果沒有封裝好的解決方案,會有很多雜活需要處理。
MQ 事務訊息 非同步場景,通用性較強,拓展性較高
有一些第三方的MQ是支援事務訊息的,比如RocketMQ,
他們支援事務訊息的方式也是類似於採用的二階段提交,
但是市面上一些主流的MQ都是不支援事務訊息的,比如 Kafka 不支援。
以阿里的 RabbitMQ 中介軟體為例,其思路大致為:
- 第一階段Prepared訊息,會拿到訊息的地址。
- 第二階段執行本地事務
- 第三階段通過第一階段拿到的地址去訪問訊息,並修改狀態。
也就是說在業務方法內要想訊息佇列提交兩次請求,一次傳送訊息和一次確認訊息。
如果確認訊息傳送失敗了RabbitMQ會定期掃描訊息叢集中的事務訊息,
這時候發現了Prepared訊息,它會向訊息傳送者確認,
所以生產方需要實現一個check介面,
RabbitMQ會根據傳送端設定的策略來決定是回滾還是繼續傳送確認訊息。
這樣就保證了訊息傳送與本地事務同時成功或同時失敗。
優點: 實現了最終一致性,不需要依賴本地資料庫事務。
缺點: 實現難度大,主流MQ不支援,RocketMQ事務訊息部分程式碼也未開源。
通過本文我們總結並對比了幾種分散式分解方案的優缺點,分散式事務本身是一個技術難題,
是沒有一種完美的方案應對所有場景的,具體還是要根據業務場景去抉擇吧。
阿里RocketMQ去實現的分散式事務,現在也有除了很多分散式事務的協調器,比如LCN等,大家可以多去嘗試。
具體實現
分散式事務的完整架構圖
美團外賣的架構
系統與系統之間的分散式事務問題
系統間呼叫過程中事務回滾問題
import com.zwt.rabbitmq.dao.OrderDataBaseService;
import com.zwt.rabbitmq.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
@Service
public class OrderService {
@Autowired
private OrderDataBaseService orderDataBaseService;
// 建立訂單
@Transactional(rollbackFor = Exception.class) // 訂單建立整個方法新增事務
public void createOrder(Order orderInfo) throws Exception {
// 1: 訂單資訊--插入丁訂單系統,訂單資料庫事務
orderDataBaseService.saveOrder(orderInfo);
// 2:通過Http介面傳送訂單資訊到運單系統
String result = dispatchHttpApi(orderInfo.getOrderId());
if(!"success".equals(result)) {
throw new Exception("訂單建立失敗,原因是運單介面呼叫失敗!");
}
}
/**
* 模擬http請求介面傳送,運單系統,將訂單號傳過去 springcloud
* @return
*/
private String dispatchHttpApi(String orderId) {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
// 連結超時 > 3秒
factory.setConnectTimeout(3000);
// 處理超時 > 2秒
factory.setReadTimeout(2000);
// 傳送http請求
String url = "http://localhost:9000/dispatch/order?orderId="+orderId;
RestTemplate restTemplate = new RestTemplate(factory);//異常
String result = restTemplate.getForObject(url, String.class);
return result;
}
}
基於MQ的分散式事務訊息的可靠生產問題
如果這個時候MQ伺服器出現了異常和故障,那麼訊息是無法獲取到回執資訊。
如何是好?
基於MQ的分散式事務訊息的可靠生產問題-定時重發
如果出現異常,咱們就重發訊息。
基於MQ的分散式事務訊息的可靠消費
基於MQ的分散式事務訊息的訊息重發
設定重試次數二定要進行控制或者 try/catch
基於MQ的分散式事務訊息的死信佇列訊息轉移 + 人工處理
如果死信佇列報錯就進行人工處理
總結
基於MQ的分散式事務解決方案優點:
1、通用性強
2、拓展方便
3、耦合度低,方案也比較成熟
基於MQ的分散式事務解決方案缺點:
1、基於訊息中介軟體,只適合非同步場景
2、訊息會延遲處理,需要業務上能夠容忍
建議
1、儘量去避免分散式事務
2、儘量將非核心業務做成非同步
Springboot整合rabbitmq叢集配置詳解
1 引入starter
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2:詳細配置如下
rabbitmq:
addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client連線到的server的地址,多個以逗號分隔(優先取addresses,然後再取host)
# port:
##叢集配置 addresses之間用逗號隔開
# addresses: ip:port,ip:port
password: admin
username: 123456
virtual-host: / # 連線到rabbitMQ的vhost
requested-heartbeat: #指定心跳超時,單位秒,0為不指定;預設60s
publisher-confirms: #是否啟用 釋出確認
publisher-reurns: # 是否啟用釋出返回
connection-timeout: #連線超時,單位毫秒,0表示無窮大,不超時
cache:
channel.size: # 快取中保持的channel數量
channel.checkout-timeout: # 當快取數量被設定時,從快取中獲取一個channel的超時時間,單位毫秒;如果為0,則總是建立一個新channel
connection.size: # 快取的連線數,只有是CONNECTION模式時生效
connection.mode: # 連線工廠快取模式:CHANNEL 和 CONNECTION
listener:
simple.auto-startup: # 是否啟動時自動啟動容器
simple.acknowledge-mode: # 表示訊息確認方式,其有三種配置方式,分別是none、manual和auto;預設auto
simple.concurrency: # 最小的消費者數量
simple.max-concurrency: # 最大的消費者數量
simple.prefetch: # 指定一個請求能處理多少個訊息,如果有事務的話,必須大於等於transaction數量.
simple.transaction-size: # 指定一個事務處理的訊息數量,最好是小於等於prefetch的數量.
simple.default-requeue-rejected: # 決定被拒絕的訊息是否重新入隊;預設是true(與引數acknowledge-mode有關係)
simple.idle-event-interval: # 多少長時間釋出空閒容器時間,單位毫秒
simple.retry.enabled: # 監聽重試是否可用
simple.retry.max-attempts: # 最大重試次數
simple.retry.initial-interval: # 第一次和第二次嘗試釋出或傳遞訊息之間的間隔
simple.retry.multiplier: # 應用於上一重試間隔的乘數
simple.retry.max-interval: # 最大重試時間間隔
simple.retry.stateless: # 重試是有狀態or無狀態
template:
mandatory: # 啟用強制資訊;預設false
receive-timeout: # receive() 操作的超時時間
reply-timeout: # sendAndReceive() 操作的超時時間
retry.enabled: # 傳送重試是否可用
retry.max-attempts: # 最大重試次數
retry.initial-interval: # 第一次和第二次嘗試釋出或傳遞訊息之間的間隔
retry.multiplier: # 應用於上一重試間隔的乘數
retry.max-interval: #最大重試時間間隔
對於傳送方而言,需要做以下配置:
1 配置CachingConnectionFactory
2 配置Exchange/Queue/Binding
3 配置RabbitAdmin建立上一步的Exchange/Queue/Binding
4 配置RabbitTemplate用於傳送訊息,RabbitTemplate通過CachingConnectionFactory獲取到Connection,然後想指定Exchange傳送
對於消費方而言,需要做以下配置:
1 配置CachingConnectionFactory
2 配置Exchange/Queue/Binding
3 配置RabbitAdmin建立上一步的Exchange/Queue/Binding
4 配置RabbitListenerContainerFactory
5 配置@RabbitListener/@RabbitHandler用於接收訊息
3 Spring AMQP的主要物件
注:如果不瞭解AMQP請前往官網瞭解.
4 使用:
通過配置類載入的方式:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
public static final String RECEIVEDLXEXCHANGE="spring-ex";
public static final String RECEIVEDLXQUEUE="spring-qu1";
public static final String RECEIVEDLXROUTINGKEY="aa";
public static final String DIRECTEXCHANGE="spring-ex";
public static final String MDMQUEUE="mdmQueue";
public static final String TOPICEXCHANGE="spring-top";
@Value("${spring.rabbitmq.addresses}")
private String hosts;
@Value("${spring.rabbitmq.username}")
private String userName;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
/* @Value("${rabbit.channelCacheSize}")
private int channelCacheSize;*/
// @Value("${rabbit.port}")
// private int port;
/* @Autowired
private ConfirmCallBackListener confirmCallBackListener;
@Autowired
private ReturnCallBackListener returnCallBackListener;*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(hosts);
cachingConnectionFactory.setUsername(userName);
cachingConnectionFactory.setPassword(password);
// cachingConnectionFactory.setChannelCacheSize(channelCacheSize);
//cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setVirtualHost(virtualHost);
//設定連線工廠快取模式:
cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
//快取連線數
cachingConnectionFactory.setConnectionCacheSize(3);
//設定連線限制
cachingConnectionFactory.setConnectionLimit(6);
logger.info("連線工廠設定完成,連線地址{}"+hosts);
logger.info("連線工廠設定完成,連線使用者{}"+userName);
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.setIgnoreDeclarationExceptions(true);
rabbitAdmin.declareBinding(bindingMdmQueue());
//宣告topic交換器
rabbitAdmin.declareExchange(directExchange());
logger.info("管理員設定完成");
return rabbitAdmin;
}
@Bean
public RabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//最小消費者數量
factory.setConcurrentConsumers(10);
//最大消費者數量
factory.setMaxConcurrentConsumers(10);
//一個請求最大處理的訊息數量
factory.setPrefetchCount(10);
//
factory.setChannelTransacted(true);
//預設不排隊
factory.setDefaultRequeueRejected(true);
//手動確認接收到了訊息
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
logger.info("監聽者設定完成");
return factory;
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECTEXCHANGE,true,false);
}
@Bean
public Queue mdmQueue(){
Map arguments = new HashMap<>();
// 繫結該佇列到私信交換機
arguments.put("x-dead-letter-exchange",RECEIVEDLXEXCHANGE);
arguments.put("x-dead-letter-routing-key",RECEIVEDLXROUTINGKEY);
logger.info("佇列交換機繫結完成");
return new Queue(RECEIVEDLXQUEUE,true,false,false,arguments);
}
@Bean
Binding bindingMdmQueue() {
return BindingBuilder.bind(mdmQueue()).to(directExchange()).with("");
}
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
//釋出確認
// rabbitTemplate.setConfirmCallback(confirmCallBackListener);
// 啟用釋出返回
// rabbitTemplate.setReturnCallback(returnCallBackListener);
logger.info("連線模板設定完成");
return rabbitTemplate;
}
/* @Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPICEXCHANGE,true,false);
}*/
/*
*//**
* @return DirectExchange
*//*
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(RECEIVEDLXEXCHANGE,true,false);
}
*//*
*
* @return Queue
*//*
@Bean
public Queue dlxQueue() {
return new Queue(RECEIVEDLXQUEUE,true);
}
*//*
* @return Binding
*//*
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY);
}*/
}
通過兩種方式載入
1 通過配置檔案
2 通過配置類
說明:上面是通過配置檔案與配置類的方式去載入,常用的配置如上所示。
實際使用中要生產方與消費方要分開配置,相關配置也會有小變動,大體配置不變。更多資訊可檢視官網配置。
叢集監控
在廣大的網際網路行業中RabbitMQ幾乎都會有叢集,那麼對於叢集的監控就成了企業生態中必不可少的一環。
接下來我們來將講解主要的4種監控。
管理介面監控
管理介面監控需要我們開啟對應的外掛(rabbitmq-plugins enable rabbitmq_management)
然後訪問http://ip:15672
tracing日誌監控
以下是trace的相關命令和使用(要使用需要先rabbitmq啟用外掛,再開啟開關才能使用):
命令集 | 描述 |
---|---|
rabbitmq-plugins list | 檢視外掛列表 |
rabbitmq-plugins enable rabbitmq_tracing | rabbitmq啟用trace外掛 |
rabbitmqctl trace_on | 開啟trace的開關 |
rabbitmqctl trace_on -p itcast | 開啟trace的開關(itcast為需要日誌追蹤的vhost) |
rabbitmqctl trace_off | 關閉trace的開關 |
rabbitmq-plugins disable rabbitmq_tracing | rabbitmq關閉Trace外掛 |
rabbitmqctl set_user_tags heima administrator | 只有administrator的角色才能檢視日誌介面 |
安裝外掛並開啟 trace_on 之後,會發現多個 exchange:amq.rabbitmq.trace ,型別為:topic。
日誌追蹤
rabbitTemplate.convertAndSend("spring_queue", "只發佇列spring_queue的訊息--01。");
定製自己的監控系統
RabbitMQ提供了很豐富的restful風格的api介面,
我們可以通過這些介面得到對應的叢集資料,此時我們就可以定製我們的監控系統。
更多API的相關資訊和描述可以訪問http://ip:15672/api/
接下來我們使用RabbitMQ Http API介面來獲取叢集監控資料
HttpClient以及Jackson的相關Jar
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.7.4</version>
</dependency>
建立MonitorRabbitMQ類實現具體的程式碼
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpEntity;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* RabbitMQ的監控
*/
public class MonitorRabbitMQ {
//RabbitMQ的HTTP API——獲取叢集各個例項的狀態資訊,ip替換為自己部署相應例項的
private static String RABBIT_NODES_STATUS_REST_URL = "http://192.168.13.111:15672/api/nodes";
//RabbitMQ的HTTP API——獲取叢集使用者資訊,ip替換為自己部署相應例項的
private static String RABBIT_USERS_REST_URL = "http://192.168.13.111:15672/api/users";
//rabbitmq的使用者名稱
private static String RABBIT_USER_NAME = "guest";
//rabbitmq的密碼
private static String RABBIT_USER_PWD = "guest";
public static void main(String[] args) {
try {
//step1.獲取rabbitmq叢集各個節點例項的狀態資訊
Map<String, ClusterStatus> clusterMap =
fetchRabbtMQClusterStatus(RABBIT_NODES_STATUS_REST_URL, RABBIT_USER_NAME, RABBIT_USER_PWD);
//step2.列印輸出各個節點例項的狀態資訊
for (Map.Entry entry : clusterMap.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
//step3.獲取rabbitmq叢集使用者資訊
Map<String, User> userMap =
fetchRabbtMQUsers(RABBIT_USERS_REST_URL, RABBIT_USER_NAME, RABBIT_USER_PWD);
//step4.列印輸出rabbitmq叢集使用者資訊
for (Map.Entry entry : userMap.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static Map<String, ClusterStatus> fetchRabbtMQClusterStatus(String url, String username, String password) throws IOException {
Map<String, ClusterStatus> clusterStatusMap = new HashMap<String, ClusterStatus>();
String nodeData = getData(url, username, password);
JsonNode jsonNode = null;
try {
jsonNode = JsonUtil.toJsonNode(nodeData);
} catch (IOException e) {
e.printStackTrace();
}
Iterator<JsonNode> iterator = jsonNode.iterator();
while (iterator.hasNext()) {
JsonNode next = iterator.next();
ClusterStatus status = new ClusterStatus();
status.setDiskFree(next.get("disk_free").asLong());
status.setFdUsed(next.get("fd_used").asLong());
status.setMemoryUsed(next.get("mem_used").asLong());
status.setProcUsed(next.get("proc_used").asLong());
status.setSocketUsed(next.get("sockets_used").asLong());
clusterStatusMap.put(next.get("name").asText(), status);
}
return clusterStatusMap;
}
public static Map<String, User> fetchRabbtMQUsers(String url, String username, String password) throws IOException {
Map<String, User> userMap = new HashMap<String, User>();
String nodeData = getData(url, username, password);
JsonNode jsonNode = null;
try {
jsonNode = JsonUtil.toJsonNode(nodeData);
} catch (IOException e) {
e.printStackTrace();
}
Iterator<JsonNode> iterator = jsonNode.iterator();
while (iterator.hasNext()) {
JsonNode next = iterator.next();
User user = new User();
user.setName(next.get("name").asText());
user.setTags(next.get("tags").asText());
userMap.put(next.get("name").asText(), user);
}
return userMap;
}
public static String getData(String url, String username, String password) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(username, password);
HttpGet httpGet = new HttpGet(url);
httpGet.addHeader(BasicScheme.authenticate(creds, "UTF-8", false));
httpGet.setHeader("Content-Type", "application/json");
CloseableHttpResponse response = httpClient.execute(httpGet);
try {
if (response.getStatusLine().getStatusCode() != 200) {
System.out.println("call http api to get rabbitmq data return code: " + response.getStatusLine().getStatusCode() + ", url: " + url);
}
HttpEntity entity = response.getEntity();
if (entity != null) {
return EntityUtils.toString(entity);
}
} finally {
response.close();
}
return null;
}
public static class JsonUtil {
private static ObjectMapper objectMapper = new ObjectMapper();
static {
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
//objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
}
public static JsonNode toJsonNode(String jsonString) throws IOException {
return objectMapper.readTree(jsonString);
}
}
public static class User {
private String name;
private String tags;
@Override
public String toString() {
return "User{" +
"name=" + name +
", tags=" + tags +
'}';
}
//GET/SET方法省略
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
}
public static class ClusterStatus {
private long diskFree;
private long diskLimit;
private long fdUsed;
private long fdTotal;
private long socketUsed;
private long socketTotal;
private long memoryUsed;
private long memoryLimit;
private long procUsed;
private long procTotal;
// 此處省略了Getter和Setter方法
public long getDiskFree() {
return diskFree;
}
public void setDiskFree(long diskFree) {
this.diskFree = diskFree;
}
public long getDiskLimit() {
return diskLimit;
}
public void setDiskLimit(long diskLimit) {
this.diskLimit = diskLimit;
}
public long getFdUsed() {
return fdUsed;
}
public void setFdUsed(long fdUsed) {
this.fdUsed = fdUsed;
}
public long getFdTotal() {
return fdTotal;
}
public void setFdTotal(long fdTotal) {
this.fdTotal = fdTotal;
}
public long getSocketUsed() {
return socketUsed;
}
public void setSocketUsed(long socketUsed) {
this.socketUsed = socketUsed;
}
public long getSocketTotal() {
return socketTotal;
}
public void setSocketTotal(long socketTotal) {
this.socketTotal = socketTotal;
}
public long getMemoryUsed() {
return memoryUsed;
}
public void setMemoryUsed(long memoryUsed) {
this.memoryUsed = memoryUsed;
}
public long getMemoryLimit() {
return memoryLimit;
}
public void setMemoryLimit(long memoryLimit) {
this.memoryLimit = memoryLimit;
}
public long getProcUsed() {
return procUsed;
}
public void setProcUsed(long procUsed) {
this.procUsed = procUsed;
}
public long getProcTotal() {
return procTotal;
}
public void setProcTotal(long procTotal) {
this.procTotal = procTotal;
}
@Override
public String toString() {
return "ClusterStatus{" +
"diskFree=" + diskFree +
", diskLimit=" + diskLimit +
", fdUsed=" + fdUsed +
", fdTotal=" + fdTotal +
", socketUsed=" + socketUsed +
", socketTotal=" + socketTotal +
", memoryUsed=" + memoryUsed +
", memoryLimit=" + memoryLimit +
", procUsed=" + procUsed +
", procTotal=" + procTotal +
'}';
}
}
}
Zabbix 監控RabbitMQ
Zabbix是一個基於WEB介面提供分散式系統監視以及網路監視功能的企業級開源解決方案,
他也可以幫助我們搭建一個MQ叢集的監控系統,同時提供預警等功能,
但是由於其搭建配置要求比較高一般都是由運維人員負責搭建,
感興趣的可以訪問https://www.zabbix.com/ 官網進行了解學習。
面試題分析
1、Rabbitmq 為什麼需要通道,為什麼不是TCP直接通訊
1、TCP的建立和銷燬,開銷大,建立要三次握手,銷燬要4次分手。
2、如果不用通道,那應用程式就會TCP連線到Rabbit伺服器,高峰時每秒成千上萬連線就會造成資源的巨大浪費,而且底層作業系統每秒處理tcp連線數也是有限制的,必定造成效能瓶頸。
3、通道的原理是一條執行緒一條通道,多條執行緒多條通道同用一條TCP連線,一條TCP連線可以容納無限的通道,即使每秒成千上萬的請求也不會成為效能瓶頸。
2:queue到底在消費者建立還是生產者建立?
1: 一般建議是在rabbitmq操作面板建立。這是一種穩妥的做法。
2:按照常理來說,確實應該消費者這邊建立是最好,訊息的消費是在這邊。
這樣你承受一個後果,可能我生產在生產訊息可能會丟失訊息。
3:在生產者建立佇列也是可以,這樣穩妥的方法,訊息是不會出現丟失。
4:如果你生產者和消費都建立的佇列,誰先啟動誰先建立,後面啟動就覆蓋前面的