1. 程式人生 > >nsq源碼閱讀筆記之nsqd(三)——diskQueue

nsq源碼閱讀筆記之nsqd(三)——diskQueue

hit emp files tro interact 一次 導致 store text

diskQueuebackendQueue接口的一個實現。backendQueue的作用是在實現在內存go channel緩沖區滿的情況下對消息的處理的對象。
除了diskQueue外還有dummyBackendQueue實現了backendQueue接口。

對於臨時(#ephemeral結尾)Topic/Channel,在創建時會使用dummyBackendQueue初始化backend
dummyBackendQueue只是為了統一臨時和非臨時Topic/Channel而寫的,它只是實現了接口,不做任何實質上的操作,
因此在內存緩沖區滿時直接丟棄消息。這也是臨時Topic/Channel和非臨時的一個比較大的差別。
每個非臨時Topic/Channel,創建的時候使用diskQueue

初始化backenddiskQueue的功能是將消息寫入磁盤進行持久化,
並在需要時從中取出消息重新向客戶端投遞。

diskQueue的實現在nsqd/disk_queue.go中。需要註意一點,查找diskQueue中的函數的調用可能不會返回正確的結果,
因為diskQueue對外是以backendQueue形式存在,因此查找diskQueue的函數的調用情況時應當查找backendQueue中相應函數的調用。

diskQueue的創建和初始化

diskQueue的獲得是通過newDiskQueue,該函數比較簡單,通過傳入的參數創建一個dispQueue
然後通過retrieveMetaData

函數獲取之前與該diskQueue相關聯的Topic/Channel已經持久化的信息。最後啟動ioLoop循環處理消息。

retrieveMetaData函數從磁盤中恢復diskQueue的狀態。diskQueue會定時將自己的狀態備份到文件中,
文件名由metaDataFileName函數確定。retrieveMetaData函數同樣通過metaDataFileName函數獲得保存狀態的文件名並打開。
該文件只有三行,格式為%d\n%d,%d\n%d,%d\n,第一行保存著該diskQueue中消息的數量(depth),
第二行保存readFileNumreadPos,第三行保存writeFileNum

writePos

這裏不太理解的一個地方是d.depth通過一個臨時變量去獲取然後通過atomic.StoreInt64保存。個人覺得沒有必要這麽做。
當然作者在nsqd: diskqueue corruption and depth accounting這個Pull Request中也提到:

I dont believe that this should be strictly necessary because retrieveMetaData is only ever called in NewDiskQueue and the ioLoopgoroutine is launched after that call (which according to the go memory model is safe).

However, I’m not 100% sure about interactions between the go memory model, go-routines, and the combined use of atomic and non-atomic operations (which is what this was relying on before this change… i.e. this was the only mutation of d.depth that was notusing atomic ops).

因此,這只是個比較保險的做法,並不一定意味著直接保存到d.depth就不安全。

retrieveMetaData相對應的是persistMetaData函數,這個函數將運行時的元數據保存到文件用於下次重新構建diskQueue時的恢復。
邏輯基本與retrieveMetaData,此處不再贅述。

diskQueue的消息循環

ioLoop函數實現了diskQueue的消息循環,diskQueue的定時操作和讀寫操作的核心都在這個函數中完成。

函數首先使用time.NewTicker(d.syncTimeout)定義了syncTicker變量,syncTicker的類型是time.Ticker
每隔d.syncTimeout時間就會在syncTicker.C這個go channel產生一個消息。
通過select syncTicker.C能實現至多d.syncTimeout時間就跳出select塊一次,這種方式相當於一個延時的default子句。
ioLoop中,通過這種方式,就能在一個goroutine中既實現消息的接收又實現定時任務(跳出select後執行定時任務,然後在進入select)。
有點類似於定時的輪詢。

ioLoop的定時任務是調用sync函數刷新文件,防止突然結束程序後內存中的內容未被提交到磁盤,導致內容丟失。
控制是否需要同步的變量是d.needSync,該變量在一次sync後會被置為false,在許多需要刷新文件的地方會被置為true
ioLoop中,d.needSync變量還跟刷新計數器count變量有關,count值的變化規則如下:

  1. 如果一次消息循環中,有寫入操作,那麽count就會被自增。
  2. count達到d.syncEvery時,會將count重置為0並且將d.needSync置為true,隨後進行文件的刷新。
  3. emptyChan收到消息時,count會被重置為0,因為文件已經被刪除了,所有要重置刷新計數器。
  4. syncTicker.C收到消息後,會將count重置為0,並且將d.needSync置為true。也就是至多d.syncTimeout時間刷新一次文件。

ioLoop還定時檢測當前是否有數據需要被讀取,如果(d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos)
`d.nextReadPos == d.readPos這兩個條件成立,則執行d.readOne()並將結果放入dataRead中,然後設置rd.readChan
如果條件不成立,則將r置為空值nil。隨後的select語句中有case r <- dataRead:這樣一個分支,在註釋中作者寫了這是一個Golang的特性,
即:如果r不為空,則會將dataRead送入go channel。進入d.readChan的消息通過ReadChan函數向外暴露,最終被Topic/Channel的消息循環讀取。
而如果r為空,則這個分支會被跳過。這個特性的使用統一了select的邏輯,簡化了當數據為空時的判斷。

diskQueue的寫操作

寫操作的對外接口是Put函數,該函數比較簡單,加鎖,並且將數據放入d.writeChan,等待d.writeResponseChan的結果後返回。
d.writeChan的接收在ioLoop中select的一個分支,處理時調用writeOne函數,並將處理結果放入d.writeResponseChan

writeOne函數是寫操作的最終執行部分,負責將消息寫入磁盤。函數邏輯比較簡單。消息寫入步驟如下:

  1. 若當前要寫的文件不存在,則通過d.fileName(d.writeFileNum)獲得文件名,並創建文件
  2. 根據d.writePos定位本次寫的位置
  3. 從要寫入的內容得到要寫入的長度
  4. 先寫入3中計算出的消息長度(4字節),然後寫入消息本身
  5. d.writePos後移4 + 消息長度作為下次寫入位置。加4是因為消息長度本身也占4字節。
  6. 判斷d.writePos是否大於每個文件的最大字節數d.maxBytesPerFile,如果是,則將d.writeFileNum加1,
    並重置d.writePos。這個操作的目的是為了防止單個文件過大。
  7. 如果下次要寫入新的文件,那麽需要調用sync函數對當前文件進行同步。

diskQueue的讀操作

消息讀取對外暴露的是一個go channel,而數據的最終來源是ioLoop中調用的readOne函數。readOne函數邏輯跟writeOne類似,
只是把寫操作換成了讀操作,唯一差異較大的地方是d.nextReadPosd.nextReadFileNum這兩個變量的使用。

在寫操作時,如果寫入成功,則可以直接將寫入位置和寫入文件更新。但是對於讀操作來說,由於讀取的目的是為了向客戶端投遞,
因此無法保證一定能投遞成功。因此需要使用next開頭的兩個變量來保存成功後需要讀的位置,如果投遞沒有成功,
則繼續使用當前的讀取位置將再一次嘗試將消息投遞給客戶端。

當消息投遞成功後,則使用moveForward函數將保存在d.nextReadPosd.nextReadFileNum中的值取出,
賦值給d.readPosd.readFileNummoveForward函數還負責清理已經讀完的舊文件。最後,調用checkTailCorruption函數檢查文件是否有錯,
如果出現錯誤,則調用skipToNextRWFile重置讀取和寫入的文件編號和位置。

diskQueue中的其他函數

diskQueue中還有與錯誤處理相關的handleReadError,與關閉diskQueue相關的CloseDeleteexitEmptydeleteAllFiles等,
函數,邏輯較簡單,不再專門分析。

diskQueue總結

diskQueue主要邏輯是對磁盤的讀寫操作,較為瑣碎但沒有復雜的架構。
其中消息循環的思路和讀寫過程周全的考慮都值得學習的。

nsq源碼閱讀筆記之nsqd(三)——diskQueue