NSQ原始碼分析(三)——disQueue
disQueue是Topic以及Channel中的記憶體訊息滿了以後,訊息存放的實現方法,diskQueue實現了檔案系統支援的FIFO佇列,disQueue也是BackendQueue介面的實現,diskQueue在檔案讀寫給我們提供了很好的學習示例。
BackendQueue介面
// BackendQueue represents the behavior for the secondary message storage system type BackendQueue interface { Put([]byte) error //向檔案中寫入資料 ReadChan() chan []byte //返回一個讀的chan Close() error //關閉 Delete() error //刪除 Depth() int64 //獲取未讀訊息的個數 Empty() error //清空操作 }
diskQueue結構體及欄位的解釋
type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) //以下5個欄位為元資料 readPos int64 //當前讀檔案的指標偏移量(從檔案的某個位置開始讀) writePos int64 //當前寫檔案的指標偏移量 readFileNum int64 //當前讀的檔案編號 writeFileNum int64 //當前寫的檔案編號,用於建立檔名使用,檔案編號每新建一個檔案會遞增1 depth int64 //寫一條訊息加1,讀一條訊息減1,可以理解成還未讀完的訊息數量 sync.RWMutex // instantiation time metadata name string //名稱 dataPath string //檔案的路徑 maxBytesPerFile int64 //每個檔案的最大位元組數 minMsgSize int32 //單條訊息的最小位元組數,預設是MsgIDLength + 8 + 2 = 26 maxMsgSize int32 //單條訊息的最大位元組數,預設是1024 * 1024 + minMsgSize syncEvery int64 // 定期重新整理檔案的讀寫次數閾值(預設2500) syncTimeout time.Duration // 定期重新整理檔案的時間戳閾值(預設2s) exitFlag int32 needSync bool //需要重新整理檔案 // keeps track of the position where we have read // (but not yet sent over readChan) nextReadPos int64 //下次需要讀的指標偏移量 nextReadFileNum int64 //下次需要讀的檔案編號 readFile *os.File //當前正在讀的檔案,如果為nil則讀取下一個檔案編號的檔案 writeFile *os.File //當前正在寫的檔案,如果為nil則新建檔案 reader *bufio.Reader writeBuf bytes.Buffer // exposed via ReadChan() readChan chan []byte //通過ReadChan()函式暴露出去 // internal channels writeChan chan []byte //寫通道 writeResponseChan chan error //寫之後返回的結果 emptyChan chan int //清空檔案的通道 emptyResponseChan chan error exitChan chan int exitSyncChan chan int logf AppLogFunc //處理日誌的函式 }
一、diskQueue的建立和初始化
1.初始化diskQueue例項
2.呼叫retrieveMetaData函式 從磁碟中恢復diskQueue的狀態。diskQueue會定時將自己的狀態備份到檔案中
3.開啟一個協程執行ioLoop函式,ioLoop函式是整個disQueue最核心的方法,作用是實現disQueue的訊息迴圈,定時重新整理檔案,讀寫操作功能
func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { d := diskQueue{ name: name, dataPath: dataPath, maxBytesPerFile: maxBytesPerFile, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), writeChan: make(chan []byte), writeResponseChan: make(chan error), emptyChan: make(chan int), emptyResponseChan: make(chan error), exitChan: make(chan int), exitSyncChan: make(chan int), syncEvery: syncEvery, syncTimeout: syncTimeout, logf: logf, } // no need to lock here, nothing else could possibly be touching this instance err := d.retrieveMetaData() if err != nil && !os.IsNotExist(err) { d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err) } go d.ioLoop() return &d }
retrieveMetaData函式
retrieveMetaData函式記錄了disQueue的元資料readPos、writePos、readFileNum、writeFileNum、depth
1.retrieveMetaData()函式用於初始化檔案系統,作用是當程式突然中止,檔案中的訊息未讀完,該函式用於初始化讀寫的檔案編號和位置及未讀的訊息數depth
2.retrieveMetaData函式從磁碟中恢復diskQueue的狀態。diskQueue會定時將自己的狀態備份到檔案中,
3.檔名由metaDataFileName函式確定。retrieveMetaData函式同樣通過metaDataFileName函式獲得儲存狀態的檔名並開啟。
4. 該檔案只有三行,格式為%d\n%d,%d\n%d,%d\n,第一行儲存著該diskQueue中未讀的訊息數量(depth),第二行儲存readFileNum和readPos,第三行儲存writeFileNum和writePos。
func (d *diskQueue) retrieveMetaData() error {
var f *os.File
var err error
fileName := d.metaDataFileName() //獲取檔名
f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
if err != nil {
return err
}
defer f.Close()
var depth int64
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&depth,
&d.readFileNum, &d.readPos,
&d.writeFileNum, &d.writePos)
if err != nil {
return err
}
atomic.StoreInt64(&d.depth, depth)
d.nextReadFileNum = d.readFileNum
d.nextReadPos = d.readPos
return nil
}
persistMetaData函式
與retrieveMetaData相對應的是persistMetaData函式,這個函式將執行時的元資料儲存到檔案用於下次重新構建diskQueue時的恢復。
邏輯基本與retrieveMetaData,此處不再贅述。
func (d *diskQueue) persistMetaData() error {
var f *os.File
var err error
fileName := d.metaDataFileName()
tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
// write to tmp file
f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
atomic.LoadInt64(&d.depth),
d.readFileNum, d.readPos,
d.writeFileNum, d.writePos)
if err != nil {
f.Close()
return err
}
f.Sync()
f.Close()
// atomically rename
return os.Rename(tmpFileName, fileName)
}
二、diskQueue的訊息迴圈
ioLoop函式實現了diskQueue的訊息迴圈功能,即輪詢讀寫、重新整理檔案等操作。
重新整理檔案的目的:防止突然結束程式後記憶體中的內容未被提交到磁碟,導致內容丟失
有兩種情況下會重新整理檔案
1.當count達到syncEvery時,即讀寫的次數累積到syncEvery。重新整理檔案後count會被置為0
2.定時重新整理,每syncTimeOut就會重新整理檔案
count的變化規則
1.如果一次訊息迴圈中,有讀或寫操作,count會自增1
2.當count達到syncEvery時,count會置為0,並重新整理檔案
3.當收到emptyChan消失時,會將count置為0,因為檔案已經被刪除了
判斷檔案中有可讀的訊息
(d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) 即讀的檔案編號小於寫的檔案編號或者讀的指標偏移量小於寫的指標偏移量
讀取檔案中的下一條訊息需要滿足兩個條件:
1.檔案中有可讀的訊息
2. d.nextReadPos = d.readPos 即上次讀到的訊息已經投遞出去,需要讀取下條新訊息
d.nextReadPos 和 d.readPos的區別
nextReadPos是下次要讀取訊息的偏移量,讀取的訊息會賦值給dataRead。
當訊息讀取到之後不一定本次迴圈一定會執行 case r <- dataRead: 將訊息投遞出去,也可能會執行其他的case分支。此時nextReadPos是下次要讀的訊息的位置,而readPos是本次訊息讀的位置
nextReadPos是下次要讀取訊息的偏移量,也是訊息投遞成功後需要讀的位置。而readPos當訊息投遞出去才會等於nextReadPos的值
可以簡單理解為:
訊息讀取前 nextReadPos = readPos
訊息已讀取,但沒有投遞出去,nextReadPos是下次訊息要讀的位置,而readPos仍是本次訊息讀的開始位置。此時:nextReadPos = readPos + 本條訊息的長度
訊息投遞成功後: readPos = nextReadPos ,將nextReadPos的值賦值給readPos
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64 //讀寫的累積次數
var r chan []byte
syncTicker := time.NewTicker(d.syncTimeout)
for {
// dont sync all the time :)
if count == d.syncEvery {
d.needSync = true
}
if d.needSync {
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
count = 0
}
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { //訊息檔案中有可以讀的訊息
if d.nextReadPos == d.readPos { //即上次讀到的訊息已經投遞出去,需要讀取下條新訊息
dataRead, err = d.readOne()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
d.handleReadError()
continue
}
}
r = d.readChan
} else {
r = nil
}
select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
//如果r為空,則這個分支會被跳過。這個特性的使用了select的邏輯,簡化了當資料為空時的判斷
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
case <-d.emptyChan:
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
case dataWrite := <-d.writeChan:
count++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
goto exit
}
}
exit:
d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
syncTicker.Stop()
d.exitSyncChan <- 1
}
三、檔案寫操作
writeOne函式負責將訊息寫入到檔案中
1.如果writeFile為nil,則需要新建檔案
2.如果writePos大於0,則設定寫的指標偏移量
3.校驗訊息長度
4.將訊息的長度(4個位元組) 和 訊息作為一條訊息寫入到檔案中
5.改變 writePos 和depth的值
6.如果寫的指標偏移量大於每個檔案的最大位元組數,將writeFileNum遞增,並將writeFile置為nil,用於下次寫入訊息的時候建立檔案
func (d *diskQueue) writeOne(data []byte) error {
var err error
if d.writeFile == nil { //如果writeFile為nil
curFileName := d.fileName(d.writeFileNum) //根據writeFileNum獲取檔名稱
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) //建立檔案
if err != nil {
return err
}
d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
if d.writePos > 0 { //如果writePos大於0,則設定寫的指標偏移量
_, err = d.writeFile.Seek(d.writePos, 0)
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}
}
dataLen := int32(len(data)) //本次訊息的長度
if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { //校驗訊息長度
return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
}
d.writeBuf.Reset() //重置writeBuf緩衝區
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) //將資料長度dataLen以二進位制大端的形式寫入到writeBuf中
if err != nil {
return err
}
_, err = d.writeBuf.Write(data) //將data寫入到writeBuf緩衝區中
if err != nil {
return err
}
// only write to the file once
_, err = d.writeFile.Write(d.writeBuf.Bytes()) //將writeBuf緩衝區的資料寫入到檔案中
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
totalBytes := int64(4 + dataLen)
d.writePos += totalBytes //改變writePos(寫的偏移量)
atomic.AddInt64(&d.depth, 1) //depth的值加1
if d.writePos > d.maxBytesPerFile { //如果寫的指標偏移量大於每個檔案的最大位元組數
/*
將writeFileNum遞增1,用於建立下一個檔案使用
writePos寫的指標偏移量置為0
重新整理檔案
關閉檔案
將writeFile置為nil,那麼下次有新訊息需要寫入檔案則會新建檔案
這個操作的目的是為了防止單個檔案過大
*/
d.writeFileNum++
d.writePos = 0
// sync every time we start writing to a new file
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}
return err
}
四、檔案讀操作
readOne函式主要用於從檔案中讀取下一條未讀訊息
主要流程:
1.如果readFile為nil,則讀取新的檔案
2.根據readPos獲取讀的位置,並讀取4個位元組(即本條訊息的長度),然後讀取本條訊息
3.更新 nextReadPos和nextReadFileNum 的值
4.如果下次需要讀的位置大於每個檔案的最大值,則將nextReadFileNum遞增,nextReadPos置為0,關閉readFile並置為nil。這樣下次就會讀取下一個檔案的訊息
5.最後返回本條訊息的內容
func (d *diskQueue) readOne() ([]byte, error) {
var err error
var msgSize int32
if d.readFile == nil { //如果readFile為nil,則讀取新的檔案
/*
獲取並開啟需要讀取的檔案
*/
curFileName := d.fileName(d.readFileNum)
d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
if err != nil {
return nil, err
}
d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)
if d.readPos > 0 {//如果readPos大於0,則設定讀的指標偏移量
_, err = d.readFile.Seek(d.readPos, 0)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
}
d.reader = bufio.NewReader(d.readFile) //設定reader
}
err = binary.Read(d.reader, binary.BigEndian, &msgSize) //從reader中讀取4個位元組,也就是本條訊息的長度
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { //校驗訊息長度
// this file is corrupt and we have no reasonable guarantee on
// where a new message should begin
d.readFile.Close()
d.readFile = nil
return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
}
readBuf := make([]byte, msgSize)
_, err = io.ReadFull(d.reader, readBuf) //將訊息讀取到reaBuf中
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
totalBytes := int64(4 + msgSize)
// we only advance next* because we have not yet sent this to consumers
// (where readFileNum, readPos will actually be advanced)
d.nextReadPos = d.readPos + totalBytes //設定下次需要讀的指標偏移量
d.nextReadFileNum = d.readFileNum
// TODO: each data file should embed the maxBytesPerFile
// as the first 8 bytes (at creation time) ensuring that
// the value can change without affecting runtime
if d.nextReadPos > d.maxBytesPerFile { //如果下次需要讀的位置大於每個檔案的最大值
/*
關閉正在讀的檔案,並置為nil
nextReadFileNum遞增1
nextReadPos置為0
*/
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
}
d.nextReadFileNum++
d.nextReadPos = 0
}
return readBuf, nil
}
五、檔案重新整理
sync函式主要用於重新整理檔案
1.呼叫file的Sync()函式,將快取中的訊息持久化到檔案中
2.將元資料更新持久化
func (d *diskQueue) sync() error {
if d.writeFile != nil {
err := d.writeFile.Sync()
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}
err := d.persistMetaData()
if err != nil {
return err
}
d.needSync = false
return nil
}
以上就是diskQueue檔案佇列的主要實現流程和方法。