1. 程式人生 > >MQ實現“延遲訊息”功能

MQ實現“延遲訊息”功能

轉載文章:轉自個人微信公眾號[架構師之路];作者:[沈劍]

一、緣起

很多時候,業務有在一段時間之後,完成一個工作任務的需求。

例如:滴滴打車訂單完成後,如果使用者一直不評價,48小時後會將自動評價為5星。

一般來說怎麼實現這類“48小時後自動評價為5星”需求呢?

常見方案:啟動一個cron定時任務,每小時跑一次,將完成時間超過48小時的訂單取出,置為5星,並把評價狀態置為已評價。

假設訂單表的結構為:t_order(oid, finish_time, stars, status, …),更具體的,定時任務每隔一個小時會這麼做一次:

select oid from t_order where finish_time > 48hours and status=0;

update t_order set stars=5 and status=1 where oid in[…];

如果資料量很大,需要分頁查詢,分頁update,這將會是一個for迴圈。

方案的不足:

1輪詢效率比較低

2)每次掃庫,已經被執行過記錄,仍然會被掃描(只是不會出現在結果集中),有重複計算的嫌疑

3時效性不夠好,如果每小時輪詢一次,最差的情況下,時間誤差會達到1小時

4)如果通過增加cron輪詢頻率來減少(3)中的時間誤差,(1)中輪詢低效和(2)中重複計算的問題會進一步凸顯

如何利用“延時訊息”,對於每個任務只觸發一次,保證效率的同時保證實時性,是今天要討論的問題。

二、高效延時訊息設計與實現

高效延時訊息,包含兩個重要的資料結構:

1環形佇列,例如可以建立一個包含3600slot環形佇列(本質是個陣列)

2任務集合,環上每一個slot是一個Set<Task>

同時,啟動一個timer,這個timer每隔1s,在上述環形佇列中移動一格,有一個Current Index指標來標識正在檢測的slot

Task結構中有兩個很重要的屬性:

1Cycle-Num:當Current Index第幾圈掃描到這個Slot時,執行任務

2Task-Function:需要執行的任務指標

假設當前Current Index指向第一格,當有延時訊息到達之後,例如希望

3610秒之後,觸發一個延時訊息任務,只需:

1計算這個Task應該放在哪一個slot,現在指向13610秒之後,應該是第11格,所以這個Task應該放在第11slotSet<Task>

2計算這個TaskCycle-Num,由於環形佇列是3600格(每秒移動一格,正好1小時),這個任務是3610秒後執行,所以應該繞3610/3600=1圈之後再執行,於是Cycle-Num=1

Current Index不停的移動,每秒移動到一個新slot,這個slot中對應的Set<Task>,每個TaskCycle-Num是不是0

1)如果不是0,說明還需要多移動幾圈,將Cycle-Num1

2)如果是0,說明馬上要執行這個Task了,取出Task-Funciton執行(可以用單獨的執行緒來執行Task),並把這個TaskSet<Task>中刪除

使用了“延時訊息”方案之後,“訂單48小時後關閉評價”的需求,只需將在訂單關閉時,觸發一個48小時之後的延時訊息即可:

1)無需再輪詢全部訂單,效率高

2)一個訂單,任務只執行一次

3)時效性好,精確到秒(控制timer移動頻率可以控制精度)

三、總結

環形佇列是一個實現延時訊息的好方法,開源的MQ好像都不支援延遲訊息,不妨自己實現一個簡易的“延時訊息佇列”,能解決很多業務問題,並減少很多低效掃庫的cron任務。

另外,關於MQ的可達性、冪等性未來撰文另述。

如果對文章和配圖滿意的話,幫忙轉發一下哈。

==【完】==

相關閱讀:

10w定時任務,如何高效觸發超時---還沒搬運過來

相關推薦

MQ實現延遲訊息功能

轉載文章:轉自個人微信公眾號[架構師之路];作者:[沈劍] 一、緣起 很多時候,業務有“在一段時間之後,完成一個工作任務”的需求。 例如:滴滴打車訂單完成後,如果使用者一直不評價,48小時後會將自動評價為5星。 一般來說怎麼實現這類“48小時後自動評價為5星

rabbitmq實現延遲訊息(附原始碼)

rabbitmq實現延遲訊息的方案 1. 使用延時佇列 單機不考慮拓展的情況下,可以使用java.util.concurrent包的DelayQueue, 但插入的物件需實現Delayed介面,並實現其getDelay方法。 優點:針對任意訊息佇列均可使用 缺

golang 實現延遲訊息原理與方法

package main; import ( "time" "errors" "fmt" ) //延遲訊息 type DelayMessage struct { //當前下標 curIndex int; //環形槽 slots [3600]map[string]

Spring Boot RabbitMQ 延遲訊息實現完整版

概述 曾經去網易面試的時候,面試官問了我一個問題,說 下完訂單後,如果使用者未支付,需要取消訂單,可以怎麼做 我當時的回答是,用定時任務掃描DB表即可。面試官不是很滿意,提出: 用定時任務無法做到準實時通知,有沒有其他辦法? 我當時的回答是: 可以用

java基於redis訂閱/釋出訊息實現聊天室功能

一、引言 趁著國慶節把redis高階應用都寫完吧,其實都很簡單。 redis高階應用:安全性、事務處理、持久化操作,訂閱/釋出、虛擬記憶體 安全性其實就是在連線redis時,需要一個密碼認證,可以

RocketMq-延遲訊息及 程式碼實現

支援延遲訊息 RocketMQ 支援定時訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。 配置 開啟安裝目錄的

Visual C++網路程式設計經典案例詳解 第5章 網頁瀏覽器 製作個性化介面 如何實現收藏夾功能 新增訊息響應函式

使用者將網址新增到收藏夾以後 便可以直接單擊選單選單中的網址進行瀏覽 使用者單擊選單的訊息響應函式重要 首先在CMainFrame類的標頭檔案MainFrm.h 中定義一個彈出選單的訊息響應函式 程式碼如下 afx_msg void OnMenuClick(int nID); //定

分散式延遲訊息佇列實現分析與設計

介紹 延遲佇列,顧名思義它是一種帶有延遲功能的訊息佇列。 那麼,是在什麼場景下我才需要這樣的佇列呢? 很多時候我們會有延時處理一個任務的需求,比如說: 2個小時後給使用者傳送簡訊。 15分鐘後關閉網路連線。 2分鐘後再次嘗試回撥。 下面我們來分別探討一下幾種實現方案: Ja

使用jedis實現Redis訊息佇列(MQ)的釋出(publish)和訊息監聽(subscribe)

前言: 本文基於jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar 其中jedis連線池需要依賴commons-pool2包,json

Java後臺框架篇--spring websocket 和stomp實現訊息功能

<%@ taglib prefix="spring" uri="http://www.springframework.org/tags" %> <html> <head> <title>Home</title> <spring:

iOS經典講解之實現App訊息推送功能(二)

作者:Loving_iOS 上一篇部落格iOS經典講解之實現App訊息推送功能(一)講解了實現訊息推送的的準備工作,接下來我們來講解第二部分的內容,實現具體的推送及程式碼示例。 訊息推送的第三方平臺有很多,這裡我們使用極光推送平臺,註冊極光推送平臺的賬號。 登陸後進入控

第18章 使用WebSocket和 STOMP實現訊息功能

概述: 瀏覽器和伺服器之間傳送訊息在SpringMVC控制器中處理訊息為目標使用者傳送訊息為了解決應用為web應用之間的通訊 Spring4.0 為 WebSocket通訊提供了支援 包括: 傳送和接收訊息的低層級API;傳送和接收訊息的高階API;用來發送訊息的模板

利用Rabbit MQ 實現一對多通知功能(動態新增刪除佇列交換機)

樓主在專案中需要實現分散式lucene查詢,由於lucene的索引是存放在本地的。網上有很多方案實現起來相對比較複雜,故樓主為了簡單化針對索引同步問題採用的方案是,如果某一結點發生索引的增刪改,通過rabbitmq通知所有lucene節點也進行本地的索引的更改。

使用MongoDB實現訊息佇列的非同步訊息功能

一、訊息佇列概述 訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。 目前在生產環境,使用較多的訊息佇列有ActiveMQ,RabbitMQ

C#實現rabbitmq 延遲佇列功能

 最近在研究rabbitmq,專案中有這樣一個場景:在使用者要支付訂單的時候,如果超過30分鐘未支付,會把訂單關掉。當然我們可以做一個定時任務,每個一段時間來掃描未支付的訂單,如果該訂單超過支付時間就關閉,但是在資料量小的時候並沒有什麼大的問題,但是資料量一大輪訓資料庫的

Spring Boot + Redis + MQ實現高併發點贊功能:專案實戰

開心一笑 【最後一次警告看我CSDN部落格的人,你們都他媽給老子小心點,首先我不管你是什麼身份,什麼背景,混的有多牛逼,是不是老闆,是不是社會人,這些我都不知道,我也都不在意,你們給我記住,都給我認真點看清楚:最近天氣實在很涼,大家多穿衣服,彆著涼了,知道不!

使用spring websocket 和stomp實現訊息功能

<%@ taglib prefix="spring" uri="http://www.springframework.org/tags" %> <html> <head> <title>Home</title> <spring:

Spring使用WebSocket、SockJS、STOMP實現訊息功能

WebSocket 概述 WebSocket協議提供了通過一個套接字實現全雙工通訊的功能。除了其他的功能之外,它能夠實現Web瀏覽器和伺服器之間的非同步通訊。全雙工意味著伺服器可以傳送訊息給瀏覽器,瀏覽器也可以傳送訊息給伺服器。 使用Spring的低層級WebS

使用AMQP實現訊息功能---RabbitMQ

實際上,AMQP具有多項JMS所不具備的優勢。首先,AMQP為訊息定義了線路層的協議。AMQP在互相協作方面就要優於JMS—它不僅能跨不同的AMQP實現,還能跨語言和平臺。AMQP能夠不侷限於java平臺和語言。 1. AMQP簡介 在JMS訊息中主要有三

spring 使用WebSocket 和 STOMP 實現訊息功能

1)本文旨在 介紹如何 利用 WebSocket 和 STOMP 實現訊息功能; 2)要知道, WebSocket 是傳送和接收訊息的 底層API,而SockJS 是在 WebSocket 之上的 API;最後 STOMP(面向訊息的簡單文字協議)是基於 SockJS 的高