etcd原始碼閱讀(二):raft
今天講的是raft
這個資料夾下的內容。我覺得etcd的程式碼寫得不夠好,當然,也有可能是因為我外行,不過這只是我的感受,不喜勿噴。
首先要看一下doc.go
這個檔案,裡面寫了很多註釋,有利於理解,此外看raft
資料夾下的程式碼,結合上一篇所說的raftexample一起來
理解,效果更佳。
首先,先把doc.go
裡的內容大概說一下:
-
新建一個raft叢集,使用
raft.StartNode
,傳入一個Config和其他節點的ID -
從已有的資料恢復raft叢集,使用
raft.RestartNode
,傳入一個Config即可 -
raft這個包只實現了raft協議,其餘的例如資料持久化,處理資料等等,需要呼叫這個包的程式碼來做,呼叫者要做的事情包括:
-
呼叫
Node.Ready()
接受目前產生的更新,然後:- 把HardState,Entries和Snapshot持久化到硬盤裡
- 把資訊傳送到To所指定的節點裡去
- 把快照和已經提交的Entry應用到狀態機裡去
-
呼叫
Node.Advance()
通知Node之前呼叫Node.Ready()
所接受的資料已經處理完畢
-
所有持久化的操作都必須使用滿足
Storage
這個介面的實現來進行持久化 -
當接收到其他節點發來的訊息時,呼叫
Node.Step
這個函式 -
每隔一段時間需要主動呼叫一次
Node.Tick()
把上面的幾個步驟集中起來,差不多是這麼些程式碼:
go for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): saveToStorage(rd.State, rd.Entries, rd.Snapshot) send(rd.Messages) if !raft.IsEmptySnap(rd.Snapshot) { processSnapshot(rd.Snapshot) } for _, entry := range rd.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } s.Node.Advance() case <-s.done: return } }
-
呼叫
-
要處理接收到的請求,就去呼叫
Node.Propose
,如果請求被提交了,就會出現在CommittedEntries
裡,並且狀態是raftpb.EntryNormal
-
MessageType 有很多種型別,詳見
doc.go
看完doc.go
之後,我們再來看一眼 raftexample 中,rc.serveChannels
裡的那一段程式碼:
case rd := <-rc.node.Ready(): rc.wal.Save(rd.HardState, rd.Entries) if !raft.IsEmptySnap(rd.Snapshot) { rc.saveSnap(rd.Snapshot) rc.raftStorage.ApplySnapshot(rd.Snapshot) rc.publishSnapshot(rd.Snapshot) } rc.raftStorage.Append(rd.Entries) rc.transport.Send(rd.Messages) if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok { rc.stop() return } rc.maybeTriggerSnapshot() rc.node.Advance()
是不是和doc.go
裡說的一毛一樣?
好了,接下來開始說raft
的實現。為啥我覺得程式碼寫的不好呢?因為實現上,有兩個結構體,一個是node
,一個是raft
。我看下
來的感受是:
-
node
負責節點相關的一些東西,raft
負責raft協議相關的東西,可能他們是想這麼分開來,但實際上,node.run
的時候就需要 傳入一個raft
,也就是說其實執行的時候node
和raft
是捆綁在一起的。直接把它們放在一起,程式碼複雜度可以下降很多。而且他們本身 在raft協議裡就是在一起的。 -
raft
結構體裡,表示各個節點的ID,使用的是一個 uint64,然後需要由呼叫者去自己根據ID追蹤具體是什麼URL,既然註定了要跨網路,何不把 網路操作封裝在接口裡,然後raft庫本身來通過介面完成操作?這樣可以進一步降低理解成本。
然後接下來我們結合raftexample
的程式碼來理解raft
資料夾下的程式碼,注意,我們目前暫時不去看etcdserver
下的程式碼,那裡是真正
跑etcd的程式碼,我瞄了一眼,等我們先多看幾個底層的東西,再去看那裡。上一篇文章 我們說到,raftexample
裡呼叫順序是:
-
main
函式 -
newRaftNode
-
新建一個
raftNode
並且呼叫raftNode.startRaft
,raftNode.startRaft
做的事情就是:rc.transport rc.serveRaft() rc.serveChannels()
-
新建一個
-
newKVStore
-
serveHttpKVAPI
serveChannels
做的事情呢,就是不斷的接受rc.proposeC
裡的資訊,而rc.proposeC
資訊的來源呢,就是serveHttpKVAPI
裡的HTTP介面接收到
請求,然後給塞進去的。serveChannels
接收到rc.proposeC
裡的資訊呢,就呼叫rc.node.Propose
,這玩意兒呢,就是raft
資料夾裡,node.go
的Propose
函式,因為他是個介面,而真正的實現就是raft/node.go
裡的Propose
方法。
而raft/node.go
裡的Propose
方法呢,最後就會呼叫func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error
,
它做的事情就是,把訊息放到n.propc
這個channel裡,如果需要等待,那麼就等待:
select { case rsp := <-pm.result: // 要等待的話,如果result不為空就返回,否則不返回(那就會執行到下面,返回nil) if rsp != nil { return rsp }
那麼哪裡會處理n.propc
的訊息呢?就在func (n *node) run(r *raft)
這個函式裡:
select { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. case pm := <-propc: // proposal 是有結果的訊息,應該是用來等待是否成功處理的 m := pm.m m.From = r.id err := r.Step(m) // 注意,Step 是一個函式,這個函式用來處理訊息。但是不同的身份有不同的Step實現,點進去看一下default裡的程式碼,就呼叫了。參見 raft.go->becomeFollower, raft.go->becomeCandidate等等裡的stepXXX函式 if pm.result != nil { pm.result <- err close(pm.result) } case m := <-n.recvc: // 收到訊息,這裡的訊息應該是不等待結果的 // filter out response message from unknown From. if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { r.Step(m) }
然後呢,你就發現,他收到訊息之後,就會呼叫r.Step
,實現在raft.go
裡:
// Step 就是傳說中的狀態機了 func (r *raft) Step(m pb.Message) error { // Handle the message term, which may result in our stepping down to a follower. switch {
這裡呢,就是raft狀態機,就是那一坨,如果訊息的Term比自己的大,就主動變Follower那一坨規則。當然了,狀態機我還沒有仔細研究
每一個狀態,畢竟,一開始就太深入細節,不方便理解,讀程式碼的時候還是要注意不能一葉障目。瞄幾眼,發現很多地方呢,其實會呼叫r.send
,然後呢,這嘎達,長這樣:
// send persists state to stable storage and then sends to its mailbox. // TODO: send先持久化,然後傳送到mailbox,那麼問題來了,mailbox是什麼? func (r *raft) send(m pb.Message) { m.From = r.id if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp { if m.Term == 0 { // All {pre-,}campaign messages need to have the term set when // sending. // - MsgVote: m.Term is the term the node is campaigning for, //non-zero as we increment the term when campaigning. // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was //granted, non-zero for the same reason MsgVote is // - MsgPreVote: m.Term is the term the node will campaign, //non-zero as we use m.Term to indicate the next term we'll be //campaigning for // - MsgPreVoteResp: m.Term is the term received in the original //MsgPreVote if the pre-vote was granted, non-zero for the //same reasons MsgPreVote is panic(fmt.Sprintf("term should be set when sending %s", m.Type)) } } else { if m.Term != 0 { panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term)) } // do not attach term to MsgProp, MsgReadIndex // proposals are a way to forward to the leader and // should be treated as local message. // MsgReadIndex is also forwarded to leader. if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex { m.Term = r.Term } } // TODO: 哪有持久化??? r.msgs = append(r.msgs, m) }
就是把訊息追加到r.msgs
這個 slice 裡。繞這麼大一圈,你說這實現閒的蛋疼麼。那麼現在的新問題是,哪裡消費了r.msgs
?
畢竟,r.msgs
目前還屬於在記憶體的訊息,註釋裡說要持久化,也沒看到哪裡持久化了。於是我就搜尋了一下r.msgs
:
$ ack -Q 'r.msgs' rawnode.go 218:if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() { raft_test.go 52: msgs := r.msgs 53: r.msgs = make([]pb.Message, 0) 738:if len(r.msgs) != 1 { 739:t.Errorf("%s,%s: %d response messages, want 1: %+v", vt, st, len(r.msgs), r.msgs) 741:resp := r.msgs[0] raft.go 470:r.msgs = append(r.msgs, m) node.go 427:r.msgs = nil // 不是併發安全的啊 609:Messages:r.msgs,
我看node.go
427行最可疑,就點進去看了一下,還真是!這叫基於瞎猜的程式碼閱讀法。。。其實很多人讀程式碼一開始都是東看看西看看,
自頂向下嘛,但是難免會有思路斷開的時候,這種時候呢,繼續多看看,自然到後邊就會連起來。但是很多寫原始碼分析的人不會寫出來,
我這可是說了大實話了。
case readyc <- rd: // Ready是各種準備好的變更 if rd.SoftState != nil { prevSoftSt = rd.SoftState } if len(rd.Entries) > 0 { prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term havePrevLastUnstablei = true } if !IsEmptyHardState(rd.HardState) { prevHardSt = rd.HardState } if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Metadata.Index } if index := rd.appliedCursor(); index != 0 { applyingToI = index } r.msgs = nil // 不是併發安全的啊 r.readStates = nil r.reduceUncommittedSize(rd.CommittedEntries) advancec = n.advancec
看下rd
是啥,原來是rd = newReady(r, prevSoftSt, prevHardSt)
。進去看看newReady
幹了什麼:
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { rd := Ready{ Entries:r.raftLog.unstableEntries(), CommittedEntries: r.raftLog.nextEnts(), Messages:r.msgs, } if softSt := r.softState(); !softSt.equal(prevSoftSt) { rd.SoftState = softSt } if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { rd.HardState = hardSt } if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot } if len(r.readStates) != 0 { rd.ReadStates = r.readStates } rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries)) return rd }
原來就是把r.msgs
塞到r.msgs
,然後置空r.msgs
。好了,大概曉得了。
要注意到,r.msgs = nil
出現的這段程式碼,在我們上面說過的node.run
這個函式裡。上面我們說了,run
函式還有一個分支是case pm := <-propc
,我把幾個分支抽出來看看:
for { // 略略略,準備 rd select { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. case pm := <-propc: // proposal 是有結果的訊息,應該是用來等待是否成功處理的 m := pm.m m.From = r.id err := r.Step(m) // 注意,Step 是一個函式,這個函式用來處理訊息。但是不同的身份有不同的Step實現,點進去看一下default裡的程式碼,就呼叫了。參見 raft.go->becomeFollower, raft.go->becomeCandidate等等裡的stepXXX函式 if pm.result != nil { pm.result <- err close(pm.result) } case m := <-n.recvc: // 收到訊息,這裡的訊息應該是不等待結果的 // filter out response message from unknown From. if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { r.Step(m) } case cc := <-n.confc: // 配置變更 // 略略略 case <-n.tickc: // 心跳和選舉的timeout,參見doc.go r.tick() case readyc <- rd: // Ready是各種準備好的變更 // 略略略 case <-advancec: // 這個是用來確認Ready已經處理完的 // 略略略 case c := <-n.status: // TODO: 好像也是狀態變更??? c <- getStatus(r) case <-n.stop: // 那就是stop咯 close(n.done) return } }
原來,是這樣的。真的是有點繞啊。好了,這一篇分析就到這裡了,其他的細節等我繼續更新吧 :)