1. 程式人生 > >(原)NSQ原始碼閱讀和分析(1)

(原)NSQ原始碼閱讀和分析(1)

 原文出處:https://www.cnblogs.com/lihaiping/p/12324371.html

本文記錄自己在閱讀和學習nsq原始碼的時候的一些學習筆記,主要目的是個人總結和方便後期查閱。

author:[email protected]

date:2020/01/13

NSQ對於傳送出去的訊息,是如何保證可靠性的

對於topic下的channel中的訊息會有router()函式進行路由,它將訊息流轉到記憶體的chan中,即channel中的incomingMsgChan轉到memoryMsgChan,如果memoryMsgChan訊息滿了,堵住的話,就會將訊息寫入到backend中。

對於memoryMsgChan中訊息,會在channel的messagePump()函式中進行再一次的流轉,他會將訊息從memoryMsgChan通道中接收,然後再次轉發到clientMsgChan中去。

對於每個client網路連結,NSQ都會對此client網路連結新建一個IOLoop()的goroutine來處理一切和client的訊息。當client傳送SUB訂閱命令之後,client會根據它訂閱的topic和channel,再啟動一個goroutine來推送channel中的msg到client,這個函式就是messagePump()函式。而在client的messagePump中,主要的任務就是傳送心跳和和接收來自channel中clientMsgChan訊息,然後將訊息打包傳送給client。當然訊息傳送給client可能會失敗,所以NSQ在這裡做一個很好的容錯失敗的策略,當我們將訊息推送給client的時候,既然訊息可能會失敗,所以我們就需要將訊息存起來,於是client會將這個訊息在它所在的channel中,啟動一個超時策略,如果超時的話,訊息會被再次以跟寵topic流轉到chanel同樣的流程進入到channel的訊息流轉流程。

而client在傳送網路訊息之前,會通過呼叫client.Channel.StartInFlightTimeout(msg, client)函式,來將訊息msg和client一起生成一個另外的訊息物件inFlightMessage,然後再將這個訊息物件增加超時時間,進一步封裝成pqueue.Item,然後分別儲存到 channel的inFlightMessages和inFlightPQ中,其中inFlightMessages是一個map,他以msg的id為key進行儲存,方便等會當客戶端收到訊息應答以後,進行刪除操作。同時在inFlightPQ中,這個主要是做超時用的,因為channel中會啟動一個goroutine函式inFlightWorker來專門處理inFlightPQ中超時訊息。

在inFlightWorker()中,主要根據超時時間來不斷的從pq佇列中取訊息,如果超時時間到了,訊息還沒有從inFlightPQ佇列中刪除掉,說明這個訊息可能丟失或者出現什麼問題了,我們就需要重新流轉這個訊息。

&n