1. 程式人生 > >異步通信rabbitmq——消息重試

異步通信rabbitmq——消息重試

rabbit 就是 消息 queue rejected send count 洛杉磯 周期

目標:

利用RabbitMQ實現消息重試和失敗處理,實現可靠的消費消費。在消息消費異常時,自動延時將消息重試,當重試超過一定次數後,則列為異常消息,等待後續特殊處理。

準備:

TTL:Time-To-Live,通過給消息、隊列設置過期時間(單位:毫秒),來控制消息、隊列的生命周期。在達到時間後,消息會變成dead message。

Dead Letter Exchanges:同普通的exchange無區別

消息重制本質是通過消息轉發來實現的。消息轉發的觸發是:

rejected - the message was rejected with requeue=false,

expired - the TTL of the message expired; or
maxlen - the maximum allowed queue length was exceeded.
這裏,我們使用expired來實現。給消息設置TTL,到期後消息未被消費,則會變成dead messager,轉發到dead letter exchange。

流程圖:

實現:

1、創建三個exchange。沒有特殊要求

2、創建三個queue。

clickQueue@retry作為重試隊列,需要特殊處理:

x-dead-letter-exchange: clickExchange

x-dead-letter-routing-key: clickKey

x-message-ttl: 30000

3、處理代碼

public void retry() throws IOException {
//消息消費
GetResponse getResponse = null;
try {
getResponse = rabbitUtil.fetch(DQConstant.CLICK_QUEUE_NAME, false);
/**
* 業務處理
*/
throw new RuntimeException("錯粗了");

} catch (Exception e) {
if(null != getResponse) {
long retryCount = getRetryCount(getResponse.getProps());
if(retryCount > 3) {
//重試超過3次的,直接存入失敗隊列
AMQP.BasicProperties properties = getResponse.getProps();
Map<String, Object> headers = properties.getHeaders();
if(null == headers) {
headers = new HashMap<>();
}
properties.builder().headers(headers);
rabbitUtil.send(DQConstant.CLICK_FAILED_EXCHANGE_NAME, DQConstant.CLICK_FAILED_ROUTING_KEY, properties, getResponse.getBody());
} else {
//重試不超過3次的,加入到重試隊列
AMQP.BasicProperties properties = getResponse.getProps();
Map<String, Object> headers = properties.getHeaders();
if(null == headers) {
headers = new HashMap<>();
}
properties.builder().headers(headers);
rabbitUtil.send(DQConstant.CLICK_RETRY_EXCHANGE_NAME, DQConstant.CLICK_RETRY_ROUTING_KEY, properties, getResponse.getBody());
}
}
}
if(null != getResponse) {
rabbitUtil.ack(getResponse);
}
}
private long getRetryCount(AMQP.BasicProperties properties) {
long retryCount = 0;
Map<String, Object> headers = properties.getHeaders();
if(null != headers) {
if(headers.containsKey("x-death")) {
List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
if(!deathList.isEmpty()) {
Map<String, Object> deathEntry = deathList.get(0);
retryCount = (Long)deathEntry.get("count");
}
}
}
return retryCount;
}
4、x-death的使用:message在轉換成dead letter時,會在其header裏添加一個名為x-death的數組。數組元素就是一次dead lettering event的記錄。包含count:消息幾次變成了dead letter。

總結:

此處只是本人的拙見。如有更好的提議,歡迎拍磚。
---------------------
作者:洛杉磯的管理局
來源:CSDN
原文:://blog.csdn.net/qq_18991441/article/details/80692255
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

異步通信rabbitmq——消息重試