etcd原始碼閱讀(一):raftexample
開始讀etcd的原始碼,今天首先來看的是 raftexample,這是一個基於 raft 的簡單記憶體KV,希望通過 raftexample 能對 etcd 有一個大概的認識。
首先看一下目錄結構:
$ tree -d -L 1 . . ├── Documentation# 文件 ├── auth# 認證?還沒細看 ├── bin# 編譯出來的二進位制檔案 ├── client# 應該是v2版本的客戶端程式碼 ├── clientv3# 應該是v3版本的客戶端程式碼 ├── contrib# 今天我們要看的raftexample就在這裡面 ├── default.etcd# 執行編譯好的etcd產生的,忽略之 ├── docs# 文件 ├── embed# 封裝了etcd的函式,以便別的程式封裝 ├── etcdctl# etcdctl命令,也就是客戶端 ├── etcdmain# main.go 呼叫了這裡 ├── etcdserver# 服務端程式碼 ├── functional# 不知道是幹啥的,看起來是用來驗證功能的測試套件 ├── hack# 開發者用的,不知道幹啥的 ├── integration# 不知道幹啥的,忽略 ├── lease# 實現etcd的租約 ├── logos ├── mvcc # MVCC儲存的實現 ├── pkg# 通用庫 ├── proxy# 代理 ├── raft# raft一致性協議的實現 ├── scripts# 各種指令碼 ├── tests# 不曉得幹啥的,忽略 ├── tools# 一些工具,不知道幹啥的,忽略 ├── vendor# go的vendor,忽略 ├── version# 版本資訊 └── wal# Write-Ahead-Log的實現 27 directories
我的如何閱讀原始碼 這篇文章裡介紹過幾種閱讀 原始碼的方式,今天我們就要用上。
首先,看 main.go
檔案:
package main import ( "flag" "strings" "go.etcd.io/etcd/raft/raftpb" ) func main() { cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers") id := flag.Int("id", 1, "node ID") kvport := flag.Int("port", 9121, "key-value server port") join := flag.Bool("join", false, "join an existing cluster") flag.Parse() proposeC := make(chan string) defer close(proposeC) confChangeC := make(chan raftpb.ConfChange) defer close(confChangeC) // raft provides a commit stream for the proposals from the http api var kvs *kvstore getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() } commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // the key-value http handler will propose updates to raft serveHttpKVAPI(kvs, *kvport, confChangeC, errorC) }
可以看出來,大概就是弄了兩個channel,然後呢,新建了一個Raft的Node,新建了一個KV儲存,然後就開始提供HTTP服務。
然後跟進去,讀 newRaftNode
:
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) { commitC := make(chan *string) errorC := make(chan error) rc := &raftNode{ proposeC:proposeC, confChangeC: confChangeC, commitC:commitC, errorC:errorC, id:id, peers:peers, join:join, waldir:fmt.Sprintf("raftexample-%d", id), snapdir:fmt.Sprintf("raftexample-%d-snap", id), getSnapshot: getSnapshot, snapCount:defaultSnapshotCount, stopc:make(chan struct{}), httpstopc:make(chan struct{}), httpdonec:make(chan struct{}), snapshotterReady: make(chan *snap.Snapshotter, 1), // rest of structure populated after WAL replay } go rc.startRaft() // 啟動raft return commitC, errorC, rc.snapshotterReady }
就是實力化了一個 raftNode,然後呢,呼叫了 raftNode.startRaft 這個方法,那就繼續跟進去:
func (rc *raftNode) startRaft() { if !fileutil.Exist(rc.snapdir) { if err := os.Mkdir(rc.snapdir, 0750); err != nil { log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err) } } rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir) rc.snapshotterReady <- rc.snapshotter oldwal := wal.Exist(rc.waldir) rc.wal = rc.replayWAL() rpeers := make([]raft.Peer, len(rc.peers)) for i := range rpeers { rpeers[i] = raft.Peer{ID: uint64(i + 1)} } c := &raft.Config{ ID:uint64(rc.id), ElectionTick:10, HeartbeatTick:1, Storage:rc.raftStorage, MaxSizePerMsg:1024 * 1024, MaxInflightMsgs:256, MaxUncommittedEntriesSize: 1 << 30, } // 設定 rc.node if oldwal { rc.node = raft.RestartNode(c) } else { startPeers := rpeers if rc.join { startPeers = nil } rc.node = raft.StartNode(c, startPeers) // 配置節點 } rc.transport = &rafthttp.Transport{ Logger:zap.NewExample(), ID:types.ID(rc.id), ClusterID:0x1000, Raft:rc, ServerStats: stats.NewServerStats("", ""), LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)), ErrorC:make(chan error), } rc.transport.Start() for i := range rc.peers { if i+1 != rc.id { rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) } } go rc.serveRaft()// 啟動HTTP服務 go rc.serveChannels() // 開始監聽各個channel然後消費 }
可以看出來,這個方法呢,就是先檢查是不是有快照,是不是有WAL日誌,如果有的話,就恢復到上一個狀態,如果沒有的話,就新建。 然後呼叫 raft.RestartNode
,這裡就是真正啟用raft一致性協議的地方了,這裡的raft就是最開始我們看的目錄裡的raft。這裡 接下來做的事情就是啟動一個transport,這嘎達呢,就跟指定的叢集裡其他節點通訊。然後起一個迴圈去消費之前建立的的channel裡的資料。 可以看到 rc.serveChannels
的程式碼:
func (rc *raftNode) serveChannels() { snap, err := rc.raftStorage.Snapshot() if err != nil { panic(err) } rc.confState = snap.Metadata.ConfState rc.snapshotIndex = snap.Metadata.Index rc.appliedIndex = snap.Metadata.Index defer rc.wal.Close() ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() // send proposals over raft go func() { confChangeCount := uint64(0) for rc.proposeC != nil && rc.confChangeC != nil { select { case prop, ok := <-rc.proposeC: log.Printf("received from rc.proposeC: prop: %+v, ok: %t", prop, ok) if !ok { rc.proposeC = nil } else { // blocks until accepted by raft state machine // 在此處,是kvstore.go裡的kvstore rc.node.Propose(context.TODO(), []byte(prop)) } case cc, ok := <-rc.confChangeC: if !ok { rc.confChangeC = nil } else { confChangeCount++ cc.ID = confChangeCount rc.node.ProposeConfChange(context.TODO(), cc) } } } // client closed channel; shutdown raft if not already close(rc.stopc) }() // event loop on raft state machine updates for { select { case <-ticker.C: rc.node.Tick() // store raft entries to wal, then publish over commit channel case rd := <-rc.node.Ready(): rc.wal.Save(rd.HardState, rd.Entries) if !raft.IsEmptySnap(rd.Snapshot) { rc.saveSnap(rd.Snapshot) rc.raftStorage.ApplySnapshot(rd.Snapshot) rc.publishSnapshot(rd.Snapshot) } rc.raftStorage.Append(rd.Entries) rc.transport.Send(rd.Messages) if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok { rc.stop() return } rc.maybeTriggerSnapshot() rc.node.Advance() case err := <-rc.transport.ErrorC: rc.writeError(err) return case <-rc.stopc: rc.stop() return } } }
這段程式碼就比較長了,先看第一個 go func()
裡的迴圈,就是監聽最開始建立的兩個channel,然後分別呼叫對應的介面,要注意, rc.node
的型別是 raft.Node
,這是一個介面。上面說了,例項化的時候,是呼叫 raft.StartNode
或者 raft.RestartNode
, 其返回結果是一個 raft.Node
,實際上程式碼返回的是 raft.node
,而 raft.node
實現了 raft.Node
這個介面,會不會有點暈?
所以呢,接下來我們要看看 rc.proposeC
這個channel, rc.confChangeC
我們就不看了,雖然不知道是幹啥的,但是呢,從名字我們先 猜測它是用來做配置變更的(實際上就是)。看 rc.proposeC
,我們就要看這個channel在哪些地方用到了,也就是說,哪裡有生產者, 哪裡有消費者。搜尋一下:
$ ack proposeC raftexample_test.go 29: proposeC[]chan string 44:proposeC:make([]chan string, len(peers)), 51:clus.proposeC[i] = make(chan string, 1) 53:clus.commitC[i], clus.errorC[i], _ = newRaftNode(i+1, clus.peers, false, nil, clus.proposeC[i], clus.confChangeC[i]) 73:close(clus.proposeC[i]) 123:}(clus.proposeC[i], clus.commitC[i], clus.errorC[i]) 126:go func(i int) { clus.proposeC[i] <- "foo" }(i) 151:clus.proposeC[0] <- "foo" 152:clus.proposeC[0] <- "bar" kvstore.go 29: proposeCchan<- string // channel for proposing updates 40:func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore { 41: s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter} 61: log.Printf("s.proposeC <- %s", buf.String()) 62: s.proposeC <- buf.String() raft.go 42: proposeC<-chan string// proposed messages (k,v) 80:// current), then new log entries. To shutdown, close proposeC and read errorC. 81:func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, 88:proposeC:proposeC, 401:for rc.proposeC != nil && rc.confChangeC != nil { 403:case prop, ok := <-rc.proposeC: 404:log.Printf("received from rc.proposeC: prop: %+v, ok: %t", prop, ok) 406:rc.proposeC = nil main.go 31: proposeC := make(chan string) 32: defer close(proposeC) 39: commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) 41: kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
可以看出來, kvstore.go
的62行應該是生產者,而 raft.go
的403行應該是消費者。基於這種假設,我們要去驗證一下,所以我加了幾行日誌, 那就執行一下:
$ curl -v -L http://127.0.0.1:9121/my-key -XPUT -d hello
下圖是服務端的日誌輸出:
可以看出來,順序是先呼叫了 s.proposeC <-
然後 received from rc.proposeC
,然後raft對訊息進行處理,把它還原成函式呼叫,就是:
-
func (s *kvstore) Propose(k string, v string)
-
func (rc *raftNode) serveChannels()
-
func (n *node) Propose(ctx context.Context, data []byte) error
,這裡的node是etcd/raft/node.go
裡的結構體type node struct
為啥呢,我們要和raftexample裡的程式碼對應上。繼續看 raftexample/main.go
:
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // the key-value http handler will propose updates to raft serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
可以看到, main.go
最後其實是執行了 serveHttpKVAPI
,然後跳進去一看,會發現,呼叫了 httpKVAPI
,我們知道,go的HTTP服務 程式碼滿足 ServeHTTP
這個介面就可以了,如果你不知道的話,說明沒有讀過 net/http
的程式碼,那麼你現在知道了。所以直接跳過去看 httpKVAPI
的 ServeHTTP
這個方法,可以看到:
type httpKVAPI struct { store*kvstore confChangeC chan<- raftpb.ConfChange } func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { key := r.RequestURI switch { case r.Method == "PUT": v, err := ioutil.ReadAll(r.Body) if err != nil { log.Printf("Failed to read on PUT (%v)\n", err) http.Error(w, "Failed on PUT", http.StatusBadRequest) return } h.store.Propose(key, string(v)) // Optimistic-- no waiting for ack from raft. Value is not yet // committed so a subsequent GET on the key may return old value w.WriteHeader(http.StatusNoContent)
h.store.Propose(key, string(v))
這裡就是入口了。按照我們之前說的順序, h.store.Propose(key, string(v))
會呼叫 s.proposeC <- buf.String()
, 然後這個時候就會喚醒 serveChannels
裡的 case prop, ok := <-rc.proposeC
,然後呼叫 rc.node.Propose(context.TODO(), []byte(prop))
, 接下來就會呼叫 etcd/raft/node.go
裡的 func (n *node) Propose(ctx context.Context, data []byte) error
這個函式。
好了,到此為止我們就知道etcd大概是怎麼一個工作法,這篇部落格到此結束。
接下來我們會繼續探索真正的etcd裡的各個細節。