異步通信rabbitmq——消息重試
目標:
利用RabbitMQ實現消息重試和失敗處理,實現可靠的消費消費。在消息消費異常時,自動延時將消息重試,當重試超過一定次數後,則列為異常消息,等待後續特殊處理。
準備:
TTL:Time-To-Live,通過給消息、隊列設置過期時間(單位:毫秒),來控制消息、隊列的生命周期。在達到時間後,消息會變成dead message。
Dead Letter Exchanges:同普通的exchange無區別
消息重制本質是通過消息轉發來實現的。消息轉發的觸發是:
rejected - the message was rejected with requeue=false,
maxlen - the maximum allowed queue length was exceeded.
這裏,我們使用expired來實現。給消息設置TTL,到期後消息未被消費,則會變成dead messager,轉發到dead letter exchange。
流程圖:
實現:
1、創建三個exchange。沒有特殊要求
2、創建三個queue。
clickQueue@retry作為重試隊列,需要特殊處理:
x-dead-letter-exchange: clickExchange
x-message-ttl: 30000
3、處理代碼
public void retry() throws IOException {
//消息消費
GetResponse getResponse = null;
try {
getResponse = rabbitUtil.fetch(DQConstant.CLICK_QUEUE_NAME, false);
/**
* 業務處理
*/
throw new RuntimeException("錯粗了");
if(null != getResponse) {
long retryCount = getRetryCount(getResponse.getProps());
if(retryCount > 3) {
//重試超過3次的,直接存入失敗隊列
AMQP.BasicProperties properties = getResponse.getProps();
Map<String, Object> headers = properties.getHeaders();
if(null == headers) {
headers = new HashMap<>();
}
properties.builder().headers(headers);
rabbitUtil.send(DQConstant.CLICK_FAILED_EXCHANGE_NAME, DQConstant.CLICK_FAILED_ROUTING_KEY, properties, getResponse.getBody());
} else {
//重試不超過3次的,加入到重試隊列
AMQP.BasicProperties properties = getResponse.getProps();
Map<String, Object> headers = properties.getHeaders();
if(null == headers) {
headers = new HashMap<>();
}
properties.builder().headers(headers);
rabbitUtil.send(DQConstant.CLICK_RETRY_EXCHANGE_NAME, DQConstant.CLICK_RETRY_ROUTING_KEY, properties, getResponse.getBody());
}
}
}
if(null != getResponse) {
rabbitUtil.ack(getResponse);
}
}
private long getRetryCount(AMQP.BasicProperties properties) {
long retryCount = 0;
Map<String, Object> headers = properties.getHeaders();
if(null != headers) {
if(headers.containsKey("x-death")) {
List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death");
if(!deathList.isEmpty()) {
Map<String, Object> deathEntry = deathList.get(0);
retryCount = (Long)deathEntry.get("count");
}
}
}
return retryCount;
}
4、x-death的使用:message在轉換成dead letter時,會在其header裏添加一個名為x-death的數組。數組元素就是一次dead lettering event的記錄。包含count:消息幾次變成了dead letter。
總結:
此處只是本人的拙見。如有更好的提議,歡迎拍磚。
---------------------
作者:洛杉磯的管理局
來源:CSDN
原文:://blog.csdn.net/qq_18991441/article/details/80692255
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
異步通信rabbitmq——消息重試