1. 程式人生 > >使用rabbitmq 實現延遲消費

使用rabbitmq 實現延遲消費

使用場景介紹

1、發版時經常需要不停機發版,遇到mq消費者,消費一半停機就會出現訊息丟失(沒有使用手動確認的情況)

2、例如支付場景,準時支付、超過未支付將執行不同的方案,其中超時未支付可以看做一個延時訊息。

3、例如滴滴、淘寶的自動評價都是類似場景(不一定是用的什麼技術)

我是發版的情況遇到了

可能有人會問了,mq支援手動確認啊,為什麼不使用確認機制呢?
1、由於用的是Spring 的RabbitListener註解,無法使用手動確認機制
2、也是最主要的原因是我們使用mq的場景

(類似於假如我做一個事情需要兩步完成,每一步完成都會收錢。那麼就會出現,第一步完成後,停機了,假如使用訊息重發就會造成浪費第一步完成的錢)

實現方式

開始想到的是發版時,可以將訊息發到佇列裡,然後消費者不要馬上消費,等一定的時間再來消費這一訊息。

在網上找資料找到兩種實現方式:

1、使用外掛

在rabbitmq 3.5.7及以上的版本提供了一個外掛(rabbitmq-delayed-message-exchange)來實現延遲佇列,而我們公司mq是單獨管理的,所以這種方式直接pass

2、使用兩個特性

AMQP和RabbitMQ本身沒有直接支援延遲佇列功能,但是可以通過以下特性模擬出延遲佇列的功能。但是我們可以通過RabbitMQ的兩個特性來曲線實現延遲佇列。

Time To Live(TTL)

RabbitMQ可以針對Queue和Message設定 x-message-tt,來控制訊息的生存時間,如果超時,則訊息變為dead letter

RabbitMQ針對佇列中的訊息過期時間有兩種方法可以設定。
  1. A: 通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。  
  2. B: 對訊息進行單獨設定,每條訊息TTL可以不同。  
如果同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為dead letter

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個引數,如果佇列內出現了dead letter,則按照這兩個引數重新路由。

x-dead-letter-exchange:出現dead letter之後將dead letter重新發送到指定exchange

x-dead-letter-routing-key:指定routing-key傳送
隊列出現dead letter的情況有:
  1. 訊息或者佇列的TTL過期  
  2. 佇列達到最大長度  
  3. 訊息被消費端拒絕(basic.reject or basic.nack)並且requeue=false  

利用DLX,當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange。這時候訊息就可以重新被消費。

示例程式碼

初始化過期佇列的程式碼

  1. @Bean
  2.     public Queue signQueueStore() {  
  3.         Map<String, Object> arguments = new HashMap<>();  
  4.         arguments.put("x-dead-letter-exchange", exchange);  
  5.         arguments.put("x-dead-letter-routing-key", signKey);  
  6.         arguments.put("x-message-ttl",300000);  
  7.         Queue queue = new Queue(signQueueStore,true,false,false,arguments);  
  8.         System.out.println("arguments :" + queue.getArguments());  
  9.         return queue;  
  10.     }  

繫結過期佇列的程式碼

  1. @Bean
  2.     public Binding  signStoreBinding() {  
  3.         return BindingBuilder.bind(signQueueStore()).to(defaultExchange()).with(signKeyStore);  
  4.     }  

初始化正常佇列的程式碼和繫結普通佇列的程式碼

  1. @Bean
  2.     public Queue signQueue() {  
  3.         returnnew Queue(signQueue);  
  4.     }  
  5.     @Bean
  6.     public Binding signBinding(Queue signQueue, DirectExchange defaultExchange) {  
  7.         /** 將佇列繫結到交換機 */
  8.         return BindingBuilder.bind(signQueue).to(defaultExchange).with(signKey);  
  9.     }  

其他傳送與消費不變

在重啟時將訊息傳送到過期佇列,在重啟完成後,訊息傳送到正常佇列,過期佇列訊息過期後會自動路由到正常佇列進行消費。(可以設定一個標誌,判斷該標誌位重啟狀態,則發到過期佇列,為正常狀態傳送到正常佇列)

相關推薦

使用rabbitmq 實現延遲消費

使用場景介紹1、發版時經常需要不停機發版,遇到mq消費者,消費一半停機就會出現訊息丟失(沒有使用手動確認的情況)2、例如支付場景,準時支付、超過未支付將執行不同的方案,其中超時未支付可以看做一個延時訊息。3、例如滴滴、淘寶的自動評價都是類似場景(不一定是用的什麼技術)我是發版

rabbitmq實現延遲訊息(附原始碼)

rabbitmq實現延遲訊息的方案 1. 使用延時佇列 單機不考慮拓展的情況下,可以使用java.util.concurrent包的DelayQueue, 但插入的物件需實現Delayed介面,並實現其getDelay方法。 優點:針對任意訊息佇列均可使用 缺

springboot 使用RabbitMQ實現延遲佇列

RabbitMQ延遲訊息實現方式 延遲任務通過訊息的TTL和Dead Letter Exchange來實現。我們需要建立2個佇列,一個用於傳送訊息,一個用於訊息過期後的轉發目標佇列。 生產者輸出訊息到Queue1,並且這個訊息是設定有有效時間的,比如60s。訊息會在Queue1中等待6

Springboot + rabbitMQ實現延遲佇列(消費者)

由於太長了,所以分成兩篇寫,接上一篇講解了訊息的定義和傳送,這裡繼續講解消費者 由於可能每條訊息所處理的邏輯可能不一樣,例如:常規訂單30分鐘不支付則取消訂單,團購訂單一天拼團不成功則取消等等,為了避免在消費者監聽類中使用大量if else,這裡使用策略模式來處理(由於sp

rabbitmq 實現延遲佇列的兩種方式

轉載請註明出處 ps: 文章裡面延遲佇列=延時佇列 什麼是延遲佇列 延遲佇列儲存的物件肯定是對應的延時訊息,所謂”延時訊息”是指當訊息被髮送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。 場景一:在訂

使用SpringCloud Stream結合rabbitMQ實現訊息消費失敗重發機制

> 前言:實際專案中經常遇到訊息消費失敗了,要進行訊息的重發。比如支付訊息消費失敗後,要分不同時間段進行N次的訊息重發提醒。 # 本文模擬場景 1. 當金額少於100時,訊息消費成功 1. 當金額大於100,小於200時,會進行3次重發,第一次1秒;第二次2秒;第三次3秒。 1. 當金額大於200時

如何用RabbitMQ實現延遲佇列

# 前言 在 `jdk` 的 `juc` 工具包中,提供了一種延遲佇列 `DelayQueue`。延遲佇列用處非常廣泛,比如我們最常見的場景就是在網購或者外賣平臺中發起一個訂單,如果不付款,一般 `15` 分鐘後就會被關閉,這個直接用定時任務是不好實現的,因為每個使用者下單的時間並不確定,所以這時候就需要用

Spring Boot 實現 RabbitMQ 延遲消費延遲重試佇列

並增加了自己的一些理解,記錄下來,以便日後查閱。 專案原始碼: 背景 何為延遲佇列? 顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。 延遲佇列能做什麼?延遲佇列多用於需要延遲工作的場景。最常見的是以下兩種場景: 延遲消費。比如:使用者生成

RabbitMQ死信機制實現延遲佇列

延遲佇列 延遲佇列儲存的物件肯定是對應的延時訊息,所謂”延時訊息”是指當訊息被髮送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。 應用場景 三方支付,掃碼支付呼叫上游的掃碼介面,當掃碼有效期過後去呼叫查詢介面查詢結果。實現方式:每當一筆掃碼支付請求後

RabbitMQ延遲消費和重複消費

轉載自 https://blog.csdn.net/quliuwuyiz/article/details/79301054 使用RabbitMQ實現延遲任務 場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設定成超時。 場景二:訂單下單之後30分鐘後,如

Spring Boot + RabbitMQ死信機制實現延遲佇列

本文轉載自部落格:http://blog.csdn.net/nexttake/article/details/78607486 ------------------------------------------------------------------------------------

RabbitMQ使用延遲佇列實現一次性定時任務(php版)

本文建立在讀者對RabbitMQ的基礎瞭解上 本文延遲佇列實現參照 https://blog.csdn.net/u012119576/article/details/74677835 對相關概念的理解參照 https://blog.csdn.net/samxx8/arti

java 實現利用 RabbitMQ 傳送和消費訊息

import com.rabbitmq.client.*; import java.io.IOException; public class Recv {private final static String QUEUE_NAME = "hello";public static void main(Stri

如何使用Spring Boot與RabbitMQ結合實現延遲佇列

背景 何為延遲佇列? 顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。 延遲佇列能做什麼?延遲佇列多用於需要延遲工作的場景。最常見的是以下兩種場景:延遲消費。比如: 使用者生成訂單之後

Springboot整合Rabbitmq實現延時消費,並實現可靠的訊息處理

一、Rabbitmq簡介1.1 rabbitmq 架構1.2 rabbitmq相關元件介紹exchange: 交換機,主要用來將生產者傳送的訊息路由給伺服器中的佇列。routing-key: 訊息路由的key,生產者在將訊息發到到exchange的時候,需要指定routing

SpringBoot | 第三十八章:基於RabbitMQ實現訊息延遲佇列方案

前言 前段時間在編寫通用的訊息通知服務時,由於需要實現類似通知失敗時,需要延後幾分鐘再次進行傳送,進行多次嘗試後,進入定時傳送機制。此機制,在原先對接銀聯支付時,銀聯的非同步通知也是類似的,在第一次通知失敗後,支付標準服務會重發,最多傳送五次,每次的間隔時間為1、4、8、16分鐘等。本文就簡單講解下使用Ra

MOOC清華《面向對象程序設計》第8章:用代理模式實現延遲初始化

blank ngx cin www. oci 初始化 lanp margin 程序 Z9閃5PFVL衣坑9http://shequ.docin.com/zdfi78227 哦3O8b4z宦塹7http://huiyi.docin.com/dvok6368 W34搜6csiy

python操作rabbitmq實現廣播效果

連接 lba cti all rec alt aid tin back 生產方(Fanout_Publisher.py) 1 # __author__ = ‘STEVEN‘ 2 import pika 3 #開啟socket 4 connection = pik

關於並發場景下,通過雙重檢查鎖實現延遲初始化的優化問題隱患的記錄

ron href 修飾符 屬性 tin 記錄 targe turn 優化問題   首先,這個問題是從《阿裏巴巴Java開發手冊》的1.6.12(P31)上面看到的,裏面有這樣一句話,並列出一種反例代碼(以下為仿寫,並非與書上一致):   在並發場景下,通過雙重檢查鎖(do

RPC使用rabbitmq實現

bsp 本地服務 font clear 自動 配置 tran contain 學習 兩天時間重寫公司架構在本地實現測試學習 雙向連接客戶端和服務端配置: 連接rabbitmq服務器 定義消息隊列 配置發送請求的模板:交換機、消息隊列。 配置監聽處理:監聽的隊列、消息轉換處