1. 程式人生 > >How we redesigned the NSQ

How we redesigned the NSQ

之前的 文章 講述了我們重塑NSQ的目的和目標, 接下來我們將詳細描述下每個功能的具體技術細節.

重構後架構圖

首先, 看一下重構後的整體架構圖:

New Arch

原來的幾個NSQ元件大部分功能是複用的, 圖中新增的就是元資料儲存服務-etcd, 以及資料同步和HA處理邏輯.

改造topic queue

為了增加副本和其他特性, 首先需要改造的就是nsq的topic資料寫入方式, 要保證資料最終落盤, 才能繼續後面的改造. 所以我們第一步重構資料寫入邏輯, 這塊邏輯本身並不依賴分散式功能, 可以獨立重構.

資料落盤

原版的topic寫入流程是通過golang裡面的chan來做的, 只有超過chan容量之後, 才會落盤. 但是chan有幾個缺點, 首先是記憶體資料, 資料會丟, 其次是隻能兩端操作, 無法做更多豐富的查詢操作. 因此chan本身作為資料儲存方案對於持久化資料是不太合適的. 改造這塊的邏輯還是比較簡單的, 只要把原來寫chan滿之後的資料才落盤的邏輯直接改成任何資料都落盤即可.

但是這樣修改之後, 第一是由於IO操作比記憶體耗時, 導致寫入速度不理想; 第二是需要處理channel和topic的資料關係問題, 每個channel的消費資料是獨立的, 原來處理方式是把所有資料複製一份到每個channel, 如果還是按這種方式處理, 會導致所有channel的資料需要再次操作資料寫入磁碟, 無疑會帶來更大的效能損失.為了避免落盤改造帶來的效能下降, 我們做了更多工作來優化寫入效能.

消費channel的資料同步處理

以前的方式之所以要從topic複製所有訊息到每個channel, 是因為使用的是golang裡面的chan這種機制, 只有複製才能保證每個消費組的資料互相獨立. 當所有資料落盤之後, 我們其實不需要再做這種資料複製的操作了, 只需要記錄每個channel已經同步的資料位移和每個channel的消費位移即可. 這樣所有的channel引用的是同一份topic磁碟資料, 每個channel維護自己的獨立位移資訊即可. 節省了資料複製的操作, 提高了效能, 也保證了各個channel之間的獨立性. 從流程上看, 改造後我們把topic的寫入和讀取流程分離開了, topic本身只負責寫入資料, channel僅負責讀取資料.

topic-channel-relationship.png

組提交和刷盤排程優化

topic到channel之間的資料複製步驟去掉之後, 已經帶來了較大的效能提升. 接著我們繼續優化topic本身的寫入優化.

首先, 在服務端引入常見的Group commit組提交方式, 將多個訊息一次性提交, 減少IO操作. 這樣不管是因為刷盤還是因為資料副本同步的延遲, 都會把這段時間的積累的多個訊息作為一組一次性寫入. 這樣就大大減少了需要操作的寫入次數. 這種服務端group commit的方式往往比Kafka客戶端批量寫入的方式會表現更好, 因為實際場景下, 往往單個客戶端的寫入並沒有那麼大, 而是分散到非常多的客戶端機器上面了. 而服務端group commit處理可以更好的優化這種場景. 具體程式碼如下:

// client async pub
// 客戶端pub的資料寫入臨時chan後, 在chan上面等待結果非同步返回
func internalPubAsync(clientTimer *time.Timer, msgBody *bytes.Buffer, topic *nsqd.Topic) error {  
  ...
    info := &nsqd.PubInfo{
        Done:       make(chan struct{}),
        MsgBody:    msgBody,
        StartPub:   time.Now(),
    }
    if clientTimer == nil {
        clientTimer = time.NewTimer(time.Second * 5)
    } else {
        clientTimer.Reset(time.Second * 5)
    }
    select {
    case topic.GetWaitChan() <- info:
    default:
        select {
        case topic.GetWaitChan() <- info:
        case <-topic.QuitChan():
            nsqd.NsqLogger().Infof("topic %v put messages failed at exiting", topic.GetFullName())
            return nsqd.ErrExiting
        case <-clientTimer.C:
            nsqd.NsqLogger().Infof("topic %v put messages timeout ", topic.GetFullName())
            return ErrPubToWaitTimeout
        }
    }
    <-info.Done
    return info.Err
}

// group commit loop
// 組提交迴圈, 在chan上面等待非同步提交寫入請求, 並嘗試一次提交所有等待中的請求,
// 請求完成後, 通過關閉對應請求的chan來通知客戶端結果.
func (c *context) internalPubLoop(topic *nsqd.Topic) {  
    messages := make([]*nsqd.Message, 0, 100)
    pubInfoList := make([]*nsqd.PubInfo, 0, 100)
    topicName := topic.GetTopicName()
    partition := topic.GetTopicPart()
    defer func() {
        done := false
        for !done {
            select {
            case info := <-topic.GetWaitChan():
                pubInfoList = append(pubInfoList, info)
            default:
                done = true
            }
        }
        for _, info := range pubInfoList {
            info.Err = nsqd.ErrExiting
            close(info.Done)
        }
    }()
    quitChan := topic.QuitChan()
    infoChan := topic.GetWaitChan()
    for {
        select {
        case <-quitChan:
            return
        case info := <-infoChan:
            if info.MsgBody.Len() <= 0 {
                nsqd.NsqLogger().Logf("empty msg body")
            }

            messages = append(messages, nsqd.NewMessage(0, info.MsgBody.Bytes()))
            pubInfoList = append(pubInfoList, info)

        default:
            if len(pubInfoList) == 0 {
                select {
                case <-quitChan:
                    return
                case info := <-infoChan:
                    messages = append(messages, nsqd.NewMessage(0, info.MsgBody.Bytes()))
                    pubInfoList = append(pubInfoList, info)
                }
                continue
            }
            var retErr error
            if c.checkForMasterWrite(topicName, partition) {
                _, _, _, err := c.PutMessages(topic, messages)
                if err != nil {
                    nsqd.NsqLogger().LogErrorf("topic %v put messages %v failed: %v", topic.GetFullName(), len(messages), err)
                    retErr = err
                }
            } else {
                topic.DisableForSlave()
                nsqd.NsqLogger().LogDebugf("should put to master: %v",
                    topic.GetFullName())
                retErr = consistence.ErrNotTopicLeader.ToErrorType()
            }
            for _, info := range pubInfoList {
                info.Err = retErr
                close(info.Done)
            }
            pubInfoList = pubInfoList[:0]
            messages = messages[:0]
        }
    }
}

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101

其次, 針對不同的topic可以支援不同的刷盤策略, 靈活適配不同業務的刷盤需求. 在建立topic的時候, 可以指定每寫入多少條訊息刷盤一次, 這樣每個業務由於寫入的頻率不同, 把刷盤的請求打散了, 避免一次性過多的刷盤請求阻塞正常寫入. 另外, 還有一個後臺goroutine會在每隔固定時間選擇一部分topic分割槽強制刷盤, 保證資料及時落盤.

改造支援資料副本和HA

topic寫入落盤改造之後, 再來看看如何增加資料副本和HA特性. 整體流程可以參考架構圖, 做了類似Kafka的設計, 將每個topic的資料節點副本元資訊寫入etcd, 然後通過etcd選舉出每個topic的leader節點. 選舉的topic的leader節點負責自己topic的資料副本同步, 其他follower節點從leader節點同步topic資料.

元資料儲存

一些少量的元資料儲存在etcd, 保證整個叢集的元資料的一致性. 具體包括每個topic的配置資訊, 副本節點的分佈, 選舉出來的leader節點資訊, nsqd資料節點的配置資訊以及nsqlookupd資料查詢節點的配置資訊. 元資料樹結構圖如下:

IMAGE

由於元資料資料量很少, 變更也非常少, 因此本身對etcd的效能並沒有什麼要求. 另外nsqd資料節點和nsqlookupd查詢節點的保活也通過etcd來做.

leader選舉和HA

改造後的nsq架構, 每個topic需要一個leader節點負責處理讀寫請求和資料同步. 為了保證每個節點的負載趨於均衡, 我們通過nsqlookupd來選擇合適的topic的leader節點, 並通知給所有副本進行leader確認. leader節點會嘗試從etcd獲取topic對應的leader鎖確認leader有效.

當某個節點失效時, 會觸發etcd的watch事件, 從而觸發nsqlookupd重新選擇其他存活的節點作為topic的新leader, 完成leader的快速切換. 客戶端如果此時正在寫入老的leader也會觸發失敗重試, 並獲取最新的leader節點, 完成自動HA. 選舉具體流程如下:

leader election.png

資料副本同步和動態ISR

每個topic選舉出來的leader節點負責同步資料到所有副本. 為了支援副本節點的動態變化, 參考了Kafka的ISR(In synced replica)的設計. 和Kafka不同的是, 我們用push模式, 不是pull的模式, 來保證資料的同步複製, 避免資料同步不一致. 因此, 資料寫入首先由leader節點發起, 並且同步到所有ISR副本節點成功後, 才返回給客戶端. 如果同步ISR節點失敗, 則嘗試動態調整ISR並重試直到成功為止. 重試的過程中會檢查leader的有效性, 以及是否重複提交等條件. 寫入流程和ISR的動態調整流程如圖所示:

The progress of Dynamic ISR.png

通過動態ISR的調整, 可以保證失效的節點及時從ISR中清理掉, 從而減少失效節點對資料副本同步效能的影響.

資料同步落後的節點會從ISR移動到Catchup列表中, Catchup列表中的節點會自動嘗試追上leader的資料日誌, 追上後會通知leader進行ISR節點加入驗證的流程.

不管是leader同步複製資料到ISR節點列表, 還是catchup節點從leader拉取同步資料, 都是通過一個commitlog來維護本地資料的同步情況. commitlog維護的是每次寫請求的自增id, 以及該寫入請求對應於topic的磁碟佇列檔案的位置和資料大小, 有了commitlog, 我們就可以判斷每個副本的同步狀態, 以及下次需要從哪裡開始同步資料, 也可以判斷寫入請求是否是重複的已提交請求. 在leader選舉時以及加入新的ISR節點時也會判斷commitlog是否已經完全同步.

資料副本的配置支援topic級別的, 因此可以在一個叢集裡面對不同的topic配置不同的副本數, 來滿足不同的業務需求, 另外topic級別的配置隔離也會更方便對後面更多的特性進行隔離. 使得不同的業務topic可以使用不同的特性配置.

自動資料平衡

由於所有的topic都是leader負責處理客戶端的pub和sub請求, 因此為了保證各個節點的負載均衡, 我們實現了自動根據負載做資料平衡的功能. 通過自動平衡功能也可以實現在縮容或者擴容時, 自動的將資料遷移到其他機器上.

資料平衡是nsqlookupd通過定期收集各個nsqd資料節點的負載資訊, 包括CPU, 資料寫入量, topic的數量等資訊, 給每個nsqd的負載算出一個load值, 然後使用動態ISR流程, 調整topic的ISR節點分佈, 使得各個nsqd的節點的load的差距在合理範圍內. 為了避免遷移影響正常服務, 可以配置允許資料平衡的時間範圍.

分割槽支援

由於引入了leader, 原來可以隨便選擇nsqd節點進行讀寫的方式需要選擇leader進行讀寫, 這樣會導致可以讀寫的節點變少, 為了提高讀寫的可擴充套件性, 對topic引入分割槽的概念, 每個topic可以指定多個分割槽, 每個分割槽使用獨立的leader節點, 這樣保證每個topic可以有多個可以讀寫的分割槽節點.

改造消費channel

topic改造之後, channel本身不會儲存訊息資料了, 因此需要對原來某些消費特性做相應的處理, 同時也更方便引入新的消費特性.

處理消費cursor

每個消費channel維護的cursor會包含當前已經消費確認的檔案位置(channel confirmed), 以及下一條訊息讀取的檔案位置(channel next read), channel的下一條讀取位置和已經消費確認位置之間的差值, 就是當前正在等待確認ack的訊息.

每次投遞訊息給客戶端時會從channel next read位置讀取下一條訊息到記憶體並更新channel next read, 如果有多個客戶端連線則會讀取多條分別投遞給多個客戶端, 當客戶端ack某條訊息時, 會根據這條訊息的位置資訊判斷是否需要移動已確認的消費位置. 如果該條訊息的起點位置和channel confirmed位置重合時, 則更新channel confimed資訊.

可以看到, 當訊息亂序到來時, 已確認消費位置的遊標channel confirmed, 只能移動到目前連續的最小位置, 如果不是連續的, 則需要臨時記錄已經確認的所有非連續段的首尾位置作為一個confirmed segment, 每次ack一條訊息會判斷是否可以和現有的confirmed segment合併成一個大的segment. cursor處理過程如下圖所示:

channel-confirmed-cursor-update.png

正常情況下, 非連續的confirmed segment會非同步複製到副本節點上, 當leader失效時, 副本節點也可以知道哪些非連續段是已經被確認的訊息, 儘量減少重複投遞. 不過非同步同步如果異常, 並且節點重啟, 記憶體中的confirmed segment會丟失, channel next read會被重置到channel confirmed位置, 因此會出現部分的訊息重複投遞. 為了保證at-least-once的投遞目的, 我們需要容忍某些異常情況下的重複訊息.

處理重試和延時消費

可以看到, 改造後的channel可能會出現某些情況下待確認訊息視窗過大的問題, 這樣會導致記憶體中維護的confirmed segment過多, 記憶體可能佔用過多的問題. 特別是當某些訊息一直在重試時或者延時處理時, 會加劇這種情況. 為了避免這種多次重試或者延時消費的訊息影響正常的資料消費, 我們為每個channel增加了一個異常延時佇列, 保證這些訊息和正常的訊息消費進行一定程度的隔離.

正常情況下, 以及少量的重試時, 會一直在記憶體中維護這些待確認的訊息物件, 一旦有多次異常的訊息, 或者延時時間較長的訊息時, 我們將此類訊息自動從記憶體中移動到延時佇列中, 然後繼續投遞後面正常的訊息. 對於延時佇列中的訊息, 我們會定時的從佇列中讀出來進行重試. 由於延時佇列是儲存在boltdb的, 因此可以使用更少的記憶體處理大量的這種異常或者延時訊息, 雖然boltdb的寫入效能不佳, 但是我們這裡只用來儲存異常訊息, 因此這種異常情況下的效能完全可以滿足. 通過引入磁碟延時佇列, 改造後的channel在不影響正常訊息的情況下可以允許更多的重試以及更長的延時訊息, 相比於之前延時1小時的上限, 可以增加延時上限到幾天.

消費資料預讀優化

為了更進一步的優化消費效能, 減少讀IO, 在channel讀取訊息進行投遞的時候, 會自動讀取更多的資料放在buffer裡面, 下次直接讀取buffer即可, 直到buffer資料讀完之後, 再進行預讀下一部分資料, 通過預讀優化, 大大減少了磁碟隨機讀IO, 也減少了讀檔案的系統呼叫次數, 也會順便減少golang對於這種系統呼叫的切換開銷.

重放歷史資料

由於改造之後的所有channel都是使用cursor來引用實際的磁碟檔案的, 因此重放歷史資料進行消費這種特性就變得十分簡單了. 只需將cursor重新定位到歷史訊息的位置即可. 而歷史訊息的位置可以藉助commitlog進行搜尋來定位. 可以支援按照時間或者按照訊息佇列的某個位置開始重放.

嚴格順序消費

預設的消費模式, 是允許多個客戶端連結同時消費topic下的同一個分割槽的不同訊息的, 這樣可以使用最小的分割槽數來達到較高的併發消費能力, 避免了像Kafka那樣為了提高消費能力, 建立過多的分割槽, 也就避免了過多分割槽數帶來的磁碟隨機IO問題, 但是也由此帶來亂序投遞的問題. 實際場景中某些業務需要保證訊息嚴格按照生產寫入的順序進行投遞, 這種情況我們就需要在服務端控制投遞的策略了.

對於業務的這種場景, 我們引入了順序投遞的特性. 並在生產方也支援按照業務定製id進行分割槽hash的生產能力, 從而保證從生產到消費整條鏈路是按照分割槽內有序的方式進行訊息流轉的. 訊息生產方會根據業務的分割槽id將同樣的id投遞到同一個topic分割槽, 保證相同id的資料的順序一致性, 而在投遞時, 會調整併發消費策略, 保證同一時刻只會投遞一條訊息進行消費, 等待上一次ack成功後才繼續投遞下一條訊息. 同時, 為了避免leader切換時影響訊息的順序性, ack的訊息還會同步到所有副本才算成功.

可以看到這種方式單個分割槽的消費併發能力沒有亂序消費能力強, 主要取決於消費業務本身的處理能力, 為了提高更高的順序消費併發能力, 需要更多的分割槽數. 分割槽數的計算可以根據每條訊息的處理時間和每秒的訊息數來計算: 分割槽數=每秒訊息條數*每條訊息的處理時間(單位s).

引入Jepsen分散式測試的支援

由於此次改造變動非常大, 引入了更多的複雜性, 為了確保我們的分散式改造滿足預期的效果, 我們做了大量的異常測試, 當然也引入了業界流行的分散式測試工具Jepsen. 在各種異常測試過程中也發現了一些普通情況下非常難以發現的問題. 通過Jepsen測試之後, 我們也對改造後的系統更有信心了, 即使後面做更多改造也可以更加放心了.

下面是nsq跑的一部分jepsen測試場景

(deftest nsq-test
  (let [test (jepsen/run!
               (assoc
                 noop-test
                 :name       "nsq-simple-partition"
                 :os         debian/os
                 :db         db
                 :client     (queue-client)
                 :nemesis    (nemesis/partition-random-halves)
                 :model      (model/unordered-queue)
                 :checker    (checker/compose
                               {:total-queue checker/total-queue})
                 :generator  (gen/phases
                               (->> (gen/queue)
                                    (gen/delay 1/10)
                                    (gen/nemesis
                                      (gen/seq
                                        (cycle [(gen/sleep 30)
                                                {:type :info :f :start}
                                                (gen/sleep 60)
                                                {:type :info :f :stop}])))
                                    (gen/time-limit 360))
                               (gen/nemesis
                                 (gen/once {:type :info, :f :stop}))
                               (gen/log "waiting for recovery")
                               (gen/sleep 60)
                               (gen/clients
                                 (gen/each
                                   (gen/once {:type :invoke
                                              :f    :drain}))))))]
    (is (:valid? (:results test)))
    (report/to "report/queue.txt"
               (-> test :results pprint))))

123456789101112131415161718192021222324252627282930313233

整個流程就是通過jepsen控制節點, 隨機的斷開叢集裡面的某些節點的網路, 持續一段時間然後恢復, 如此反覆操作幾次. 在這些斷網過程中, 有多個客戶端不停的進行pub和sub操作並記錄所有的操作記錄, 最後再對所有的訊息進行對比, 判斷是否有任何pub成功的訊息沒有被消費到, 以及重複的訊息統計等. 通過對比結果就可以知道是否通過了jepsen測試的檢驗. 有了jepsen自動測試, 我們會在每次新版本釋出前跑多次jepsen測試指令碼, 確保沒有引入新的分散式問題.

訊息軌跡查詢系統

之前的NSQ系統由於訊息在chan裡面流轉一遍就沒了, 很難事後進行資料追查, 也沒有一個很好的方式去跟蹤一條訊息在從生產到被消費的各種中間狀態. 為了滿足業務方時不時的來排查業務訊息狀態的需求, 我們改造後的NSQ也支援動態的開啟這種訊息軌跡排查功能. 為了支援訊息軌跡查詢, 我們做了如下幾個工作:

  • 能通過訊息id定位到磁碟上的訊息內容
  • 支援傳入業務traceid, 並將業務traceid和nsq的訊息id關聯起來
  • 記錄一條訊息的所有中間態變化資訊
  • 將訊息的狀態變遷資訊同步到ES搜尋系統
  • nsqadmin提供方便的統一查詢入口
  • 支援針對topic和channel級別的動態跟蹤開關以減少資源消耗

通過這一套軌跡查詢系統, 在業務方需要排查問題的時候, 就能非常快速的找到異常訊息的軌跡從而快速的定位業務問題了. 特別是在排查順序消費的業務場景時, 經常能發現業務潛在的併發誤用問題.

nsq-trace-admin.png

總結

改造的過程中還特別需要注意和老版本的客戶端的相容性, 以減少客戶端的遷移改造成本. 除了以上幾個大的改造點, 還有很多小的改造細節, 感興趣的可以去研究下我們的開原始碼 NSQ

接下來我們還會有一篇文章講述我們正在新增的新功能以及接下來的計劃, 包括各種sink, connector, 訊息過濾, 事務訊息設計等等, 後面更多新的特性也會在將來不斷的補充和完善起來, 敬請期待, 也歡迎有志之士踴躍參與NSQ的開源改進計劃.