1. 程式人生 > >消費端如何保證訊息佇列MQ的有序消費

消費端如何保證訊息佇列MQ的有序消費

訊息無序產生的原因

訊息佇列,既然是佇列就能保證訊息在進入佇列,以及出佇列的時候保證訊息的有序性,顯然這是在訊息的生產端(Producer),但是往往在生產環境中有多個訊息的消費端(Consumer),儘管消費端在拉取訊息時是有序的,但各個訊息由於網路等方面原因無法保證在各個消費端中處理時有序。

場景分析

先後兩次修改了商品資訊,訊息A和訊息B先後同步寫入MySQL,接著非同步寫入訊息佇列中傳送訊息,此時訊息佇列生產端(Producer)按時序先後發出了A和B兩條訊息(訊息A先發出,訊息B後發出)。按業務邏輯,商品資訊的最終狀態需要以訊息A和訊息B綜合為準。

看似一個比較常見的同步寫資料庫,非同步傳送訊息的場景,但實際上需要保證訊息的有序消費。

  • 假設1:訊息A只包含修改的商品名稱,訊息B只包含修改的商品重量,此時訊息佇列的消費端實際上不需要關注訊息時序,訊息佇列消費端(Consumer)只管消費即可。
  • 假設2:訊息A包含修改的商品名稱、重量,訊息B包含修改的商品名稱,此時消費端首先接收到訊息B,後接收到訊息A,那麼訊息B的修改就會被覆蓋。此時訊息佇列的消費端實際上又需要關注訊息時序。

可見,你無法保證訊息中包含什麼資訊,此時必須保證訊息的有序消費。

業務角度如何保證訊息有序消費

  • 生產端在傳送訊息時,始終保證訊息是全量資訊。
  • 消費端在接收訊息時,通過快取時間戳的方式,消費訊息時判斷訊息產生的時間是否最新,如果不是則丟棄,如果是則執行下一步。

下面通過虛擬碼的方式描述:

生產端虛擬碼

insertWare(ware); #插入資料到資料庫,通常在插入資料庫時我們只會update修改的欄位,而不會全量插入

ware = selectWareById(ware.getId); #獲取商品的全量資訊(此時是最新的),用於將它放入到訊息佇列中

syncMq(ware); #非同步傳送mq訊息A

消費端虛擬碼

ware = fetchWare(); #獲取訊息

if (isLasted(ware)) #通過商品的修改時間戳判斷是否是最新的修改

​ TODO #執行下一步業務邏輯

else

​ return #丟棄該訊息

重點在於消費端如何判斷該訊息是否是最新的修改也就是isLasted

方法。

isLasted方法

Long modified = getCacheById(ware.getId); #獲取快取中該條商品的最新修改時間

If (ware.getModified > modified) { #如果訊息中商品修改時間大於快取中的時間,說明是最新操作

​ setCacheById(ware); #將該條訊息的商品修改時間戳寫入到快取中

​ return true;
} else #如果訊息中的商品修改時間小於快取中的時間,說明該條訊息屬於“歷史操作”,不對其更新

​ return false;

以上就是通過虛擬碼的方式,描述如何通過業務手段保證訊息有序消費,重點在於全量傳送資訊和快取時間戳。在其中還有一些技術實現細節。

例如:消費端消費訊息B,執行到獲取時間戳快取之後,並在重新設定新的快取之前,此時另一個消費端恰好也正在消費B它也正執行到獲取時間戳快取,由於訊息A此時並沒有更新快取,訊息A拿到的快取仍然是舊的快取,這時就會存在兩個消費端都認為自己所消費的訊息時最新的,造成該丟棄的訊息沒丟。

顯然,這是分散式執行緒安全問題,分散式鎖通常使用Redis或者ZooKeeper,加鎖後的執行時序如下圖所示。

這是從業務角度保證訊息在消費端有序消費。通過在訊息傳送端全量傳送訊息以及在訊息消費端快取時間戳就可以保證訊息的有序消費。

在上述場景中是先同步寫入MySQL,再獲取商品全量資料,接著再非同步傳送訊息。這一系列的步驟可以通過接MySQL的binlog實現,在同步寫入MySQL後,MySQL傳送binlog變更,通過阿里巴巴Canal中介軟體接收MySQL的binlog變更再發送訊息到訊息佇列。

這是一個能給程式設計師加buff的公眾號 (CoderBuff)