1. 程式人生 > >死磕以太坊原始碼分析之downloader同步

死磕以太坊原始碼分析之downloader同步

> 死磕以太坊原始碼分析之downloader同步 > > **需要配合註釋程式碼看**:https://github.com/blockchainGuide/ > > *這篇文章篇幅較長,能看下去的是條漢子,建議收藏* > > 希望讀者在閱讀過程中,指出問題,給個關注,一起探討。 ## 概覽 `downloader` 模組的程式碼位於 `eth/downloader` 目錄下。主要的功能程式碼分別是: - `downloader.go` :實現了區塊同步邏輯 - `peer.go` :對區塊各個階段的組裝,下面的各個`FetchXXX` 就是很依賴這個模組。 - `queue.go` :對`eth/peer.go`的封裝 - `statesync.go` :同步`state`物件 ## 同步模式 ### full sync full 模式會在資料庫中儲存所有區塊資料,同步時從遠端節點同步 header 和 body 資料,而state 和 receipt 資料則是在本地計算出來的。 在 full 模式下,downloader 會同步區塊的 header 和 body 資料組成一個區塊,然後通過 blockchain 模組的 `BlockChain.InsertChain` 向資料庫中插入區塊。在 `BlockChain.InsertChain` 中,會逐個計算和驗證每個塊的 `state` 和 `recepit` 等資料,如果一切正常就將區塊資料以及自己計算得到的 `state`、`recepit` 資料一起寫入到資料庫中。 ### fast sync `fast` 模式下,`recepit` 不再由本地計算,而是和區塊資料一樣,直接由 `downloader` 從其它節點中同步;`state` 資料並不會全部計算和下載,而是選一個較新的區塊(稱之為 `pivot`)的 `state` 進行下載,以這個區塊為分界,之前的區塊是沒有 `state` 資料的,之後的區塊會像 `full` 模式下一樣在本地計算 `state`。因此在 `fast` 模式下,同步的資料除了 `header` 和 body,還有 `receipt`,以及 `pivot` 區塊的 `state`。 因此 `fast` 模式忽略了大部分 `state` 資料,並且使用網路直接同步 `receipt` 資料的方式替換了 full 模式下的本地計算,所以比較快。 ### light sync light 模式也叫做輕模式,它只對區塊頭進行同步,而不同步其它的資料。 SyncMode: - FullSync:從完整區塊同步整個區塊鏈歷史 - FastSync:快速下載標題,僅在鏈頭處完全同步 - LightSync:僅下載標題,然後終止 ## 區塊下載流程 > 圖片只是大概的描述一下,實際還是要結合程式碼,**所有區塊鏈相關文章合集**,https://github.com/blockchainGuide/ > > 同時希望結識更多區塊鏈圈子的人,可以star上面專案,持續更新 ![image-20201222221031797](https://tva1.sinaimg.cn/large/0081Kckwgy1glwzmv4zyej30zh0u0dps.jpg) 首先根據`Synchronise`開始區塊同步,通過`findAncestor`找到指定節點的共同祖先,並在此高度進行同步,同時開啟多個`goroutine`同步不同的資料:`header`、`receipt`、`body`。假如同步高度為 100 的區塊,必須先`header`同步成功同步完成才可以喚醒`body`和`receipts`的同步。 而每個部分的同步大致都是由`FetchParts`來完成的,裡面包含了各個`Chan`的配合,也會涉及不少的回撥函式,總而言之多讀幾遍每次都會有不同的理解。接下來就逐步分析這些關鍵內容。 ------ ## synchronise ①:確保對方的TD高於我們自己的TD ```go currentBlock := pm.blockchain.CurrentBlock() td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) pHead, pTd := peer.Head() if pTd.Cmp(td) <= 0 { return } ``` ②:開啟`downloader`的同步 ```go pm.downloader.Synchronise(peer.id, pHead, pTd, mode) ``` 進入函式:主要做了以下幾件事: 1. `d.synchronise(id, head, td, mode)` :同步過程 2. 錯誤日誌輸出, 並刪除此`peer`。 進入到`d.synchronise`,走到最後一步`d.syncWithPeer(p, hash, td)`真正開啟同步。 ```go func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error { ... return d.syncWithPeer(p, hash, td) } ``` syncWithPeer大概做了以下幾件事: 1. 查詢祖先`findAncestor` 2. 開啟單獨`goroutine`分別執行以下幾個函式: - fetchHeaders - processHeaders - fetchbodies - fetchReceipts - processFastSyncContent - processFullSyncContent 接下來的文章,以及整個`Downloader`模組主要內容就是圍繞這幾個部分進行展開。 ------- ## findAncestor 同步首要的是**確定同步區塊的區間**:頂部為遠端節點的最高區塊,底部為兩個節點都擁有的相同區塊的最高高度(祖先區塊)。`findAncestor`就是用來找祖先區塊。函式分析如下: ①:確定本地高度和遠端節點的最高高度 ```go var ( floor = int64(-1) // 底部 localHeight uint64 // 本地最高高度 remoteHeight = remoteHeader.Number.Uint64() // 遠端節點最高高度 ) switch d.mode { case FullSync: localHeight = d.blockchain.CurrentBlock().NumberU64() case FastSync: localHeight = d.blockchain.CurrentFastBlock().NumberU64() default: localHeight = d.lightchain.CurrentHeader().Number.Uint64() } ``` ②:計算同步的高度區間和間隔 ```go from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight) ``` - `from`::表示從哪個高度開始獲取區塊 - `count`:表示從遠端節點獲取多少個區塊 - `skip`:表示間隔,比如`skip` 為 2 ,獲取第一個高度為 5,則第二個就是 8 - `max`:表示最大高度 ③:傳送獲取`header`的請求 ```go go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false) ``` ④:處理上面請求接收到的`header` :`case packet := <-d.headerCh` 1. 丟棄掉不是來自我們請求節的內容 2. 確保返回的`header`數量不為空 3. 驗證返回的`headers`的高度是我們所請求的 4. 檢查是否找到共同祖先 ```go //----① if packet.PeerId() != p.id { log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) break } //-----② headers := packet.(*headerPack).headers if len(headers) == 0 { p.log.Warn("Empty head header set") return 0 } //-----③ for i, header := range headers { expectNumber := from + int64(i)*int64(skip+1) if number := header.Number.Int64(); number != expectNumber { // 驗證這些返回的header是否是我們上面請求的headers p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number) return 0, errInvalidChain } } //-----④ // 檢查是否找到共同祖先 finished = true //注意這裡是從headers最後一個元素開始查詢,也就是高度最高的區塊。 for i := len(headers) - 1; i >= 0; i-- { // 跳過不在我們請求的高度區間內的區塊 if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max { continue } // //檢查我們本地是否已經有某個區塊了,如果有就算是找到了共同祖先, //並將共同祖先的雜湊和高度設定在number和hash變數中。 h := headers[i].Hash() n := headers[i].Number.Uint64() ``` ⑤:如果通過固定間隔法找到了共同祖先則返回祖先,會對其高度與 `floor` 變數進行驗證, `floor` 變數代表的是共同祖先的高度的最小值,如果找到共同祖先的高度比這個值還小,就認為是兩個節點之間分叉太大了,不再允許進行同步。如果一切正常,就返回找到的共同祖先的高度 `number` 變數。 ```GO if hash != (common.Hash{}) { if int64(number) <= floor { return 0, errInvalidAncestor } return number, nil } ``` ⑥:如果固定間隔法沒有找到祖先則通過二分法來查詢祖先,這部分可以思想跟二分法演算法類似,有興趣的可以細看。 ------ ## queue詳解 `queue`物件和`Downloader`物件是相互作用的,`Downloader`的很多功能離不開他,接下來我們介紹一下這部分內容,但是本節,**可以先行跳過**,等到了閱讀下面的關於`Queue`呼叫的一些函式部分再回過來閱讀這部分講解。 ### queue結構體 ```go type queue struct { mode SyncMode // 同步模式 // header處理相關 headerHead common.Hash //最後一個排隊的標頭的雜湊值以驗證順序 headerTaskPool map[uint64]*types.Header //待處理的標頭檢索任務,將起始索引對映到框架標頭 headerTaskQueue *prque.Prque //骨架索引的優先順序佇列,以獲取用於的填充標頭 headerPeerMiss map[string]map[uint64]struct{} //已知不可用的對等頭批處理集 headerPendPool map[string]*fetchRequest //當前掛起的頭檢索操作 headerResults []*types.Header //結果快取累積完成的頭 headerProced int //從結果中拿出來已經處理的header headerContCh chan bool //header下載完成時通知的頻道 blockTaskPool map[common.Hash]*types.Header //待處理的塊(body)檢索任務,將雜湊對映到header blockTaskQueue *prque.Prque //標頭的優先順序佇列,以用於獲取塊(bodies) blockPendPool map[string]*fetchRequest //當前的正在處理的塊(body)檢索操作 blockDonePool map[common.Hash]struct{} //已經完成的塊(body) receiptTaskPool map[common.Hash]*types.Header //待處理的收據檢索任務,將雜湊對映到header receiptTaskQueue *prque.Prque //標頭的優先順序佇列,以用於獲取收據 receiptPendPool map[string]*fetchRequest //當前的正在處理的收據檢索操作 receiptDonePool map[common.Hash]struct{} //已經完成的收據 resultCache []*fetchResult //下載但尚未交付獲取結果 resultOffset uint64 //區塊鏈中第一個快取的獲取結果的偏移量 resultSize common.StorageSize // 塊的近似大小 lock *sync.Mutex active *sync.Cond closed bool } ``` ### 主要細分功能 #### 資料下載開始安排任務 - `ScheduleSkeleton`:*將一批`header`檢索任務新增到佇列中,以填充已檢索的`header skeleton`* - `Schedule`:*用來準備對一些 `body` 和 `receipt` 資料的下載* #### 資料下載中的各類狀態 - `pending` `pending`表示待檢索的XXX請求的數量,包括了:`PendingHeaders`、`PendingBlocks`、`PendingReceipts`,分別都是對應取`XXXTaskQueue`的長度。 - `InFlight` `InFlight`表示是否有正在獲取XXX的請求,包括:`InFlightHeaders`、`InFlightBlocks`、`InFlightReceipts`,都是通過判斷`len(q.receiptPendPool) > 0` 來確認。 - `ShouldThrottle` `ShouldThrottle`表示檢查是否應該限制下載XXX,包括:`ShouldThrottleBlocks`、`ShouldThrottleReceipts`,主要是為了防止下載過程中本地記憶體佔用過大。 - `Reserve` `Reserve`通過構造一個 `fetchRequest` 結構並返回,向呼叫者提供指定數量的待下載的資料的資訊(`queue` 內部會將這些資料標記為「正在下載」)。呼叫者使用返回的 `fetchRequest` 資料向遠端節點發起新的獲取資料的請求。包括:`ReserveHeaders`、`ReserveBodies`、`ReserveReceipts`。 - `Cancel` `Cance`用來撤消對 `fetchRequest` 結構中的資料的下載(`queue` 內部會將這些資料重新從「正在下載」的狀態更改為「等待下載」)。包括:`CancelHeaders`、`CancelBodies`、`CancelReceipts`。 - `expire` `expire`檢查正在執行中的請求是否超過了超時限制,包括:`ExpireHeaders`、`ExpireBodies`、`ExpireReceipts`。 - `Deliver` 當有資料下載成功時,呼叫者會使用 `deliver` 功能用來通知 `queue` 物件。包括:`DeliverHeaders`、`DeliverBodies`、`DeliverReceipts`。 #### 資料下載完成獲取區塊資料 - `RetrieveHeaders` 在填充 `skeleton` 完成後,`queue.RetrieveHeaders` 用來獲取整個 `skeleton` 中的所有 `header`。 - `Results` `queue.Results` 用來獲取當前的 `header`、`body` 和 `receipt`(只在 `fast` 模式下) 都已下載成功的區塊(並將這些區塊從 `queue` 內部移除) ------ ### 函式實現 #### ScheduleSkeleton queue.ScheduleSkeleton主要是為了填充skeleton,它的引數是要下載區塊的起始高度和所有 `skeleton` 區塊頭,最核心的內容則是下面這段迴圈: ```go func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { ...... for i, header := range skeleton { index := from + uint64(i*y) q.headerTaskPool[index] = header q.headerTaskQueue.Push(index, -int64(index)) } } ``` 假設已確定需要下載的區塊高度區間是從 10 到 46,`MaxHeaderFetch` 的值為 10,那麼這個高度區塊就會被分成 3 組:10 - 19,20 - 29,30 - 39,而 skeleton 則分別由高度為 19、29、39 的區塊頭組成。迴圈中的 `index` 變數實際上是每一組區塊中的第一個區塊的高度(比如 10、20、30),`queue.headerTaskPool` 實際上是一個**每一組區塊中第一個區塊的高度到最後一個區塊的 header 的對映** ```go headerTaskPool = { 10: headerOf_19, 20: headerOf_20, 30: headerOf_39, } ``` ---- #### ReserveHeaders `reserve` 用來獲取可下載的資料。 ```go reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) { return d.queue.ReserveHeaders(p, count), false, nil } ``` ```go func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { if _, ok := q.headerPendPool[p.id]; ok { return nil } //① ... send, skip := uint64(0), []uint64{} for send == 0 && !q.headerTaskQueue.Empty() { from, _ := q.headerTaskQueue.Pop() if q.headerPeerMiss[p.id] != nil { if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok { skip = append(skip, from.(uint64)) continue } } send = from.(uint64) // ② } ... for _, from := range skip { q.headerTaskQueue.Push(from, -int64(from)) } // ③ ... request := &fetchRequest{ Peer: p, From: send, Time: time.Now(), } q.headerPendPool[p.id] = request // ④ } ``` ①:根據`headerPendPool`來判斷遠端節點是否正在下載資料資訊。 ②:從`headerTaskQueue`取出值作為本次請求的起始高度,賦值給`send`變數,在這個過程中會排除headerPeerMiss所記錄的節點下載資料失敗的資訊。 ③:將失敗的任務再重新寫回`task queue` ④:利用`send`變數構造`fetchRequest`結構,此結構是用來作為`FetchHeaders`來使用的: ```go fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } ``` 至此,`ReserveHeaders`會從任務佇列裡選擇最小的起始高度並構造`fetchRequest`傳遞給`fetch`獲取資料。 ----- #### DeliverHeaders ```go deliver = func(packet dataPack) (int, error) { pack := packet.(*headerPack) return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh) } ``` ①:如果發現下載資料的節點沒有在 `queue.headerPendPool` 中,就直接返回錯誤;否則就繼續處理,並將節點記錄從 `queue.headerPendPool` 中刪除。 ```go request := q.headerPendPool[id] if request == nil { return 0, errNoFetchesPending } headerReqTimer.UpdateSince(request.Time) delete(q.headerPendPool, id) ``` ②:驗證`headers` 包括三方面驗證: 1. 檢查起始區塊的高度和雜湊 2. 檢查高度的連線性 3. 檢查雜湊的連線性 ```go if accepted { //檢查起始區塊的高度和雜湊 if headers[0].Number.Uint64() != request.From { ... accepted = false } else if headers[len(headers)-1].Hash() != target { ... accepted = false } } if accepted { for i, header := range headers[1:] { hash := header.Hash() // 檢查高度的連線性 if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { ... } if headers[i].Hash() != header.ParentHash { // 檢查雜湊的連線性 ... } } } ``` ③: 將無效資料存入`headerPeerMiss`,並將這組區塊起始高度重新放入`headerTaskQueue` ```go if !accepted { ... miss := q.headerPeerMiss[id] if miss == nil { q.headerPeerMiss[id] = make(map[uint64]struct{}) miss = q.headerPeerMiss[id] } miss[request.From] = struct{}{} q.headerTaskQueue.Push(request.From, -int64(request.From)) return 0, errors.New("delivery not accepted") } ``` ④:儲存資料,並通知`headerProcCh`處理新的`header` ```go if ready > 0 { process := make([]*types.Header, ready) copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) select { case headerProcCh <- process: q.headerProced += len(process) default: } } ``` ⑤:傳送訊息給.`headerContCh`,通知`skeleton` 都被下載完了 ```go if len(q.headerTaskPool) == 0 { q.headerContCh <- false } ``` `DeliverHeaders` 會對資料進行檢驗和儲存,併發送 channel 訊息給 `Downloader.processHeaders` 和 `Downloader.fetchParts`的 `wakeCh` 引數。 ----- #### Schedule `processHeaders`在處理`header`資料的時候,會呼叫`queue.Schedule` 為下載 `body` 和 `receipt` 作準備。 ```go inserts := d.queue.Schedule(chunk, origin) ``` ```GO func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { inserts := make([]*types.Header, 0, len(headers)) for _, header := range headers { //校驗 ... q.blockTaskPool[hash] = header q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) if q.mode == FastSync { q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) } inserts = append(inserts, header) q.headerHead = hash from++ } return inserts } ``` 這個函式主要就是將資訊寫入到body和receipt佇列,等待排程。 ------ #### ReserveBody&Receipt 在 `queue` 中準備好了 **body** 和 **receipt** 相關的資料, `processHeaders`最後一段,是喚醒下載Bodyies和Receipts的關鍵程式碼,會通知 `fetchBodies` 和 `fetchReceipts` 可以對各自的資料進行下載了。 ```go for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } } ``` 而`fetchXXX` 會呼叫`fetchParts`,邏輯類似上面的的,`reserve`最終則會呼叫`reserveHeaders`,`deliver` 最終呼叫的是 `queue.deliver`. 先來分析`reserveHeaders`: ①:如果沒有可處理的任務,直接返回 ```go if taskQueue.Empty() { return nil, false, nil } ``` ②:如果引數給定的節點正在下載資料,返回 ```go if _, ok := pendPool[p.id]; ok { return nil, false, nil } ``` ③:計算 queue 物件中的快取空間還可以容納多少條資料 ```go space := q.resultSlots(pendPool, donePool) ``` ④:從 「task queue」 中依次取出任務進行處理 主要實現以下功能: - 計算當前 header 在 `queue.resultCache` 中的位置,然後填充 `queue.resultCache` 中相應位置的元素 - 處理空區塊的情況,若為空不下載。 - 處理遠端節點缺少這個當前區塊資料的情況,如果發現這個節點曾經下載當前資料失敗過,就不再讓它下載了。 注意:`resultCache` 欄位用來記錄所有正在被處理的資料的處理結果,它的元素型別是 `fetchResult` 。它的 `Pending` 欄位代表當前區塊還有幾類資料需要下載。這裡需要下載的資料最多有兩類:body 和 receipt,`full` 模式下只需要下載 `body` 資料,而 `fast` 模式要多下載一個 `receipt` 資料。 ```go for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { header := taskQueue.PopItem().(*types.Header) hash := header.Hash() index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { .... } if q.resultCache[index] == nil { components := 1 if q.mode == FastSync { components = 2 } q.resultCache[index] = &fetchResult{ Pending: components, Hash: hash, Header: header, } } if isNoop(header) { donePool[hash] = struct{}{} delete(taskPool, hash) space, proc = space-1, proc-1 q.resultCache[index].Pending-- progress = true continue } if p.Lacks(hash) { skip = append(skip, header) } else { send = append(send, header) } } ``` 最後就是構造 `fetchRequest` 結構並返回。 ----- #### DeliverBodies&Receipts `body` 或 `receipt` 資料都已經通過 `reserve` 操作構造了 `fetchRequest` 結構並傳給 `fetch`,接下來就是等待資料的到達,資料下載成功後,會呼叫 `queue` 物件的 `deliver` 方法進行傳遞,包括 `queue.DeliverBodies` 和 `queue.DeliverReceipts`。這兩個方法都以不同的引數呼叫了 `queue.deliver` 方法: ①:如果下載的資料數量為 0,則把所有此節點此次下載的資料標記為「缺失」 ```go if results == 0 { for _, header := range request.Headers { request.Peer.MarkLacking(header.Hash()) } } ``` ②:迴圈處理資料,通過呼叫`reconstruct` 填充 `resultCache[index]` 中的相應的欄位 ```go for i, header := range request.Headers { ... if err := reconstruct(header, i, q.resultCache[index]); err != nil { failure = err break } } ``` ③:驗證`resultCache` 中的資料,其對應的 `request.Headers` 中的 `header` 都應為 nil,若不是則說明驗證未通過,需要假如到task queue重新下載 ```go for _, header := range request.Headers { if header != nil { taskQueue.Push(header, -int64(header.Number.Uint64())) } } ``` ④:如果有資料被驗證通過且寫入 `queue.resultCache` 中了(`accepted` > 0),傳送 `queue.active` 訊息。`Results` 會等待這這個訊號。 ---- #### Results 當(header、body、receipt)都下載完,就要將區塊寫入到資料庫了,`queue.Results` 就是用來返回所有目前已經下載完成的資料,它在 `Downloader.processFullSyncContent` 和 `Downloader.processFastSyncContent` 中被呼叫。程式碼比較簡單就不多說了。 到此為止`queue`物件就分析的差不多了。 ----- ## 同步headers ### fetchHeaders 同步`headers` 是是由函式`fetchHeaders`來完成的。 `fetchHeaders`的大致思想: 同步`header`的資料會被填充到`skeleton`,每次從遠端節點獲取區塊資料最大為`MaxHeaderFetch`(192),所以要獲取的區塊資料如果大於192 ,會被分成組,每組`MaxHeaderFetch`,剩餘的不足192個的不會填充進`skeleton`,具體步驟如下圖所示: ![image-20201219111103965](https://tva1.sinaimg.cn/large/0081Kckwgy1glszpqzeuwj313u0msq73.jpg) 此種方式可以**避免從同一節點下載過多錯誤資料**,如果我們連線到了一個惡意節點,它可以創造一個鏈條很長且`TD`值也非常高的區塊鏈資料。如果我們的區塊從 0 開始全部從它那同步,也就下載了一些根本不被別人承認的資料。如果我只從它那同步 `MaxHeaderFetch` 個區塊,然後發現這些區塊無法正確填充我之前的 `skeleton`(可能是 `skeleton` 的資料錯了,或者用來填充 `skeleton` 的資料錯了),就會丟掉這些資料。 接下來檢視下程式碼如何實現: ①:發起獲取`header`的請求 如果是下載`skeleton`,則會從高度 `from+MaxHeaderFetch-1` 開始(包括),每隔 `MaxHeaderFetch-1` 的高度請求一個 `header`,最多請求 `MaxSkeletonSize` 個。如果不是的話,則要獲取完整的`headers` 。 ②:等待並處理`headerCh`中的`header`資料 2.1 確保遠端節點正在返回我們需要填充`skeleton`所需的`header` ```go if packet.PeerId() != p.id { log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId()) break } ``` 2.2 如果`skeleton`已經下載完畢,則需要繼續填充`skeleton` ```go if packet.Items() == 0 && skeleton { skeleton = false getHeaders(from) continue } ``` 2.3 整個`skeleton`填充完成,並且沒有要獲取的`header`了,要通知`headerProcCh`全部完成 ```go if packet.Items() == 0 { //下載pivot時不要中止標頭的提取 if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { p.log.Debug("No headers, waiting for pivot commit") select { case <-time.After(fsHeaderContCheck): getHeaders(from) continue case <-d.cancelCh: return errCanceled } } //完成Pivot操作(或不進行快速同步),並且沒有標頭檔案,終止該過程 p.log.Debug("No more headers available") select { case d.headerProcCh <- nil: return nil case <-d.cancelCh: return errCanceled } } ``` 2.4 當`header`有資料並且是在獲取`skeleton`的時候,呼叫`fillHeaderSkeleton`填充`skeleton` ```go if skeleton { filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { p.log.Debug("Skeleton chain invalid", "err", err) return errInvalidChain } headers = filled[proced:] from += uint64(proced) } ``` 2.5 如果當前處理的不是 `skeleton`,表明區塊同步得差不多了,處理尾部的一些區塊 判斷本地的主鏈高度與新收到的 header 的最高高度的高度差是否在 `reorgProtThreshold` 以內,如果不是,就將高度最高的 `reorgProtHeaderDelay` 個 header 丟掉。 ```go if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() { delay := reorgProtHeaderDelay if delay > n { delay = n } headers = headers[:n-delay] } ``` 2.6 如果還有 `header` 未處理,發給 `headerProcCh` 進行處理,`Downloader.processHeaders` 會等待這個 channel 的訊息並進行處理; ```go if len(headers) > 0 { ... select { case d.headerProcCh <- headers: case <-d.cancelCh: return errCanceled } from += uint64(len(headers)) getHeaders(from) } ``` 2.7 如果沒有傳送標頭,或者所有標頭等待 `fsHeaderContCheck` 秒,再次呼叫 `getHeaders` 請求區塊 ```go p.log.Trace("All headers delayed, waiting") select { case <-time.After(fsHeaderContCheck): getHeaders(from) continue case <-d.cancelCh: return errCanceled } ``` 這段程式碼後來才加上的,其 commit 的記錄在[這裡](https://github.com/ethereum/go-ethereum/commit/6ee3b26f447459d3f3a316dbb572e461a273e193#diff-c2fa15e758e986688c646459d8970a50),而 「pull request」 在[這裡](https://github.com/ethereum/go-ethereum/pull/17839)。從 「pull request」 中作者的解釋我們可以瞭解這段程式碼的邏輯和功能:這個修改主要是為了解決經常出現的 「invalid hash chain」 錯誤,出現這個錯誤的原因是因為在我們上一次從遠端節點獲取到一些區塊並將它們加入到本地的主鏈的過程中,遠端節點發生了 reorg 操作(參見[這篇文章](https://yangzhe.me/2019/03/24/ethereum-blockchain/)裡關於「主鏈與側鏈」的介紹 );當我們再次根據高度請求新的區塊時,對方返回給我們的是它的新的主鏈上的區塊,而我們沒有這個鏈上的歷史區塊,因此在本地寫入區塊時就會返回 「invalid hash chain」 錯誤。 要想發生 「reorg」 操作,就需要有新區塊加入。在以太坊主網上,新產生一個區塊的間隔是 10 秒到 20 秒左右。一般情況下,如果僅僅是區塊資料,它的同步速度還是很快的,每次下載也有最大數量的限制。所以在新產生一個區塊的這段時間裡,足夠同步完成一組區塊資料而對方節點不會發生 「reorg」 操作。但是注意剛才說的「僅僅是區塊資料」的同步較快,**state 資料的同步就非常慢了**。簡單來說在完成同步之前可能會有多個 「pivot」 區塊,這些區塊的 state 資料會從網路上下載,這就大大拖慢了整個區塊的同步速度,使得本地在同步一組區塊的同時對方發生 「reorg」 操作的機率大大增加。 作者認為這種情況下發生的 「reorg」 操作是由新產生的區塊的競爭引起的,所以最新的幾個區塊是「不穩定的」,如果本次同步的區塊數量較多(也就是我們同步時消耗的時間比較長)(在這裡「本次同步的區數數量較多」的表現是新收到的區塊的最高高度與本地資料庫中的最高高度的差距大於 `reorgProtThreshold`),那麼在同步時可以先避免同步最新區塊,這就是 `reorgProtThreshold` 和 `reorgProtHeaderDelay` 這個變數的由來。 至此,`Downloader.fetchHeaders` 方法就結束了,所有的區塊頭也就同步完成了。在上面我們提到填充`skeleton`的時候,是由`fillHeaderSkeleton`函式來完成,接下來就要細講填充`skeleton`的細節。 ------ ### fillHeaderSkeleton 首先我們知道以太坊在同步區塊時,先確定要下載的區塊的高度區間,然後將這個區間按 `MaxHeaderFetch` 切分成很多組,每一組的最後一個區塊組成了 「skeleton」(最後一組不滿 `MaxHeaderFetch` 個區塊不算作一組)。不清楚的可以檢視上面的圖。 ①:將一批`header`檢索任務新增到佇列中,以填充`skeleton`。 這個函式參照上面**queue詳解**的分析 > func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {} ②:呼叫`fetchParts` 獲取`headers`資料 `fetchParts`是很核心的函式,下面的`Fetchbodies`和`FetchReceipts`都會呼叫。先來大致看一下`fetchParts`的結構: ```go func (d *Downloader) fetchParts(...) error { ... for { select { case <-d.cancelCh: case packet := <-deliveryCh: case cont := <-wakeCh: case <-ticker.C: case <-update: ... } } ``` 簡化下來就是這 5 個`channel`在處理,前面 4 個`channel`負責迴圈等待訊息,`update`用來等待其他 4 個`channel`的通知來處理邏輯,先分開分析一個個的`channel`。 2.1 deliveryCh 傳遞下載的資料 `deliveryCh` 作用就是傳遞下載的資料,當有資料被真正下載下來時,就會給這個 `channel` 發訊息將資料傳遞過來。這個 channel 對應的分別是:`d.headerCh`、`d.bodyCh`、`d.receiptCh`,而這三個 `channel` 分別在以下三個方法中被寫入資料:`DeliverHeaders`、`DeliverBodies`、`DeliverReceipts`。 看下`deliveryCh`如何處理資料: ```go case packet := <-deliveryCh: if peer := d.peers.Peer(packet.PeerId()); peer != nil { accepted, err := deliver(packet)//傳遞接收到的資料塊並檢查鏈有效性 if err == errInvalidChain { return err } if err != errStaleDelivery { setIdle(peer, accepted) } switch { case err == nil && packet.Items() == 0: ... case err == nil: ... } } select { case update <- struct{}{}: default: } ``` 收到下載資料後判斷節點是否有效,如果節點沒有被移除,則會通過`deliver`傳遞接收到的下載資料。如果沒有任何錯誤,則通知`update`處理。 要注意`deliver`是一個回撥函式,它呼叫了 queue 物件的 Deliver 方法:`queue.DeliverHeaders`、`queue.DeliverBodies`、`queue.DeliverReceipts`,在收到下載資料就會呼叫此回撥函式(**queue相關函式分析參照queue詳解部分**)。 在上面處理錯誤部分,有一個`setIdle`函式,它也是回撥函式,其實現都是呼叫了 `peerConnection` 物件的相關方法:`SetHeadersIdle`、`SetBodiesIdle`、`SetReceiptsIdle`。它這個函式是指某些節點針對某類資料是空閒的,比如`header`、`bodies`、`receipts`,如果需要下載這幾類資料,就可以從空閒的節點下載這些資料。 2.2 `wakeCh` 喚醒`fetchParts` ,下載新資料或下載已完成 ```GO case cont := <-wakeCh: if !cont { finished = true } select { case update <- struct{}{}: default: } ``` 首先我們通過呼叫fetchParts傳遞的引數知道,`wakeCh` 的值其實是 `queue.headerContCh`。在 `queue.DeliverHeaders` 中發現所有需要下戴的 header 都下載完成了時,才會傳送 false 給這個 channel。`fetchParts` 在收到這個訊息時,就知道沒有 header 需要下載了。程式碼如下: ```go func (q *queue) DeliverHeaders(......) (int, error) { ...... if len(q.headerTaskPool) == 0 { q.headerContCh <- false } ...... } ``` 同樣如此,`body`和`receipt`則是`bodyWakeCh`和`receiptWakeCh`,在 `processHeaders` 中,如果所有 `header` 已經下載完成了,那麼傳送 `false` 給這兩個 `channel`,通知它們沒有新的 `header` 了。 `body` 和 `receipt` 的下載依賴於 `header`,需要 `header` 先下載完成才能下載,所以對於下戴 `body` 或 `receipt` 的 `fetchParts` 來說,收到這個 `wakeCh` 就代表不會再有通知讓自己下載資料了. ```go func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { for { select { case headers := <-d.headerProcCh: if len(headers) == 0 { for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: } } ... } ... for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } } } } } ``` 2.3 ticker 負責週期性的啟用 `update`進行訊息處理 ```go case <-ticker.C: select { case update <- struct{}{}: default: } ``` 2.4 `update` (處理此前幾個`channel`的資料)(**重要**) 2.4.1 判斷是否有效節點,並獲取超時資料的資訊 獲取超時資料的節點ID和資料數量,如果大於兩個的話,就將這個節點設定為空閒狀態(`setIdle`),小於兩個的話直接斷開節點連線。 `expire` 是一個回撥函式,會返回當前所有的超時資料資訊。這個函式的實際實現都是呼叫了 `queue` 物件的 `Expire` 方法:`ExpireHeaders`、`ExpireBodies`、`ExpireReceipts`,此函式會統計當前正在下載的資料中,起始時間與當前時間的差距超過給定閾值(`downloader.requestTTL` 方法的返回值)的資料,並將其返回。 ```go if d.peers.Len() == 0 { return errNoPeers } for pid, fails := range expire() { if peer := d.peers.Peer(pid); peer != nil { if fails > 2 { ... setIdle(peer, 0) } else { ... if d.dropPeer == nil { } else { d.dropPeer(pid) .... } } } ``` 2.4.2 處理完超時資料,判斷是否還有下載的資料 如果沒有其他可下載的內容,請等待或終止,這裡`pending()`和`inFlight()`都是回撥函式,`pending`分別對應了`queue.PendingHeaders`、`queue.PendingBlocks`、`queue.PendingReceipts`,用來返回各自要下載的任務數量。`inFlight()`分別對應了`queue.InFlightHeaders`、`queue.InFlightBlocks`、`queue.InFlightReceipts`,用來返回正在下載的資料數量。 ```go if pending() == 0 { if !inFlight() && finished { ... return nil } break } ``` 2.4.3 使用空閒節點,呼叫`fetch`函式傳送資料請求 `Idle()`回撥函式在上面已經提過了,`throttle()`回撥函式則分別對`queue.ShouldThrottleBlocks`、`queue.ShouldThrottleReceipts`,用來表示是否應該下載`bodies`或者`receipts`。 `reserve`函式分別對應`queue.ReserveHeaders`、`queue.ReserveBodies`、`queue.ReserveReceipts`,用來從從下載任務中選取一些可以下載的任務,並構造一個 `fetchRequest` 結構。它還返回一個 `process` 變數,標記著是否有空的資料正在被處理。比如有可能某區塊中未包含任何一條交易,因此它的 `body` 和 `receipt` 都是空的,這種資料其實是不需要下載的。在 `queue` 物件的 `Reserve` 方法中,會對這種情況進行識別。如果遇到空的資料,這些資料會被直接標記為下載成功。在方法返回時,就將是否發生過「直接標記為下載成功」的情況返回。 `capacity`回撥函式分別對應`peerConnection.HeaderCapacity`、`peerConnection.BlockCapacity`、`peerConnection.ReceiptCapacity`,用來決定下載需要請求資料的個數。 `fetch`回撥函式分別對應`peer.FetchHeaders`、`peer.Fetchbodies`、`peer.FetchReceipts`,用來發送獲取各類資料的請求。 ```go progressed, throttled, running := false, false, inFlight() idles, total := idle() for _, peer := range idles { if throttle() { ... } if pending() == 0 { break } request, progress, err := reserve(peer, capacity(peer)) if err != nil { return err } if progress { progressed = true } if request == nil { continue } if request.From > 0 { ... } ... if err := fetch(peer, request); err != nil { ... } if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { return errPeersUnavailable } ``` 簡單來概括這段程式碼就是:使用空閒節點下載資料,判斷是否需要暫停,或者資料是否已經下載完成;之後選取資料進行下載;最後,如果沒有遇到空塊需要下載、且沒有暫停下載和所有有效節點都空閒和確實有資料需要下載,但下載沒有執行起來,就返回 `errPeersUnavailable` 錯誤。 到此為止`fetchParts`函式就分析的差不多了。裡面涉及的跟`queue.go`相關的一些函式都在**queue詳解**小節裡介紹了。 ------ ### processHeaders 通過`headerProcCh`接收`header`資料,並處理的過程是在`processHeaders`函式中完成的。整個處理過程集中在:`case headers := <-d.headerProcCh中`: ①:如果`headers`的長度為0 ,則會有以下操作: 1.1 通知所有人`header`已經處理完畢 ```go for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: } } ``` 1.2 若沒有檢索到任何`header`,說明他們的`TD`小於我們的,或者已經通過我們的`fetcher`模組進行了同步。 ```go if d.mode != LightSync { head := d.blockchain.CurrentBlock() if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { return errStallingPeer } } ``` 1.3 如果是`fast`或者`light` 同步,確保傳遞了`header` ```GO if d.mode == FastSync || d.mode == LightSync { head := d.lightchain.CurrentHeader() if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { return errStallingPeer } } ``` ②:如果`headers`的長度大於 0 2.1 如果是fast或者light 同步,呼叫**ightchain.InsertHeaderChain()**寫入`header`到`leveldb`資料庫; ```go if d.mode == FastSync || d.mode == LightSync { .... d.lightchain.InsertHeaderChain(chunk, frequency); .... } ``` 2.2 如果是`fast`或者`full sync`模式,則呼叫 d.queue.Schedule進行內容(body和receipt)檢索。 ```go if d.mode == FullSync || d.mode == FastSync { ... inserts := d.queue.Schedule(chunk, origin) ... } ``` ③:如果找到更新的塊號,則要發訊號通知新任務 ```go if d.syncStatsChainHeight < origin { d.syncStatsChainHeight = origin - 1 } for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { case ch <- true: default: } } ``` 到此處理`Headers`的分析就完成了。 ------ ## 同步bodies 同步`bodies` 則是由`fetchBodies`函式完成的。 ### fetchBodies 同步bodies的過程跟同步header類似,大致講下步驟: 1. 呼叫`fetchParts` 2. `ReserveBodies`()從`bodyTaskPool`中取出要同步的`body`; 3. 呼叫`fetch`,也就是呼叫這裡的`FetchBodies`從節點獲取`body`,傳送`GetBlockBodiesMsg`訊息; 4. 收到`bodyCh`的資料後,呼叫`deliver`函式,將Transactions和`Uncles`寫入`resultCache`。 ------- ## 同步Receipts ### fetchReceipts 同步`receipts`的過程跟同步`header`類似,大致講下步驟: 1. 呼叫`fetchParts`() 2. `ReserveBodies`()從`ReceiptTaskPool`中取出要同步的`Receipt` 3. 呼叫這裡的`FetchReceipts`從節點獲取`receipts`,傳送`GetReceiptsMsg`訊息; 4. 收到`receiptCh`的資料後,呼叫`deliver`函式,將`Receipts`寫入`resultCache`。 ------- ## 同步狀態 這裡我們講兩種模式下的狀態同步: - **fullSync**: `processFullSyncContent`,`full`模式下`Receipts`沒有快取到`resultCache`中,直接先從快取中取出`body`資料,然後執行交易生成狀態,最後寫入區塊鏈。 - **fastSync**:`processFastSyncContent`:fast模式的Receipts、Transaction、Uncles都在resultCache中,所以還需要下載"state",進行校驗,再寫入區塊鏈。 接下來大致的討論下這兩種方式。 ### processFullSyncContent ```go func (d *Downloader) processFullSyncContent() error { for { results := d.queue.Results(true) ... if err := d.importBlockResults(results); err != nil ... } } ``` ```go func (d *Downloader) importBlockResults(results []*fetchResult) error { ... select { ... blocks := make([]*types.Block, len(results)) for i, result := range results { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } if index, err := d.blockchain.InsertChain(blocks); err != nil { .... } ``` 直接從`result`中獲取資料並生成`block`,直接插入區塊鏈中,就結束了。 ---- ### processFastSyncContent fast模式同步狀態內容比較多,大致也就如下幾部分,我們開始簡單分析以下。 ①:下載最新的區塊狀態 ```go sync := d.syncState(latest.Root) ``` 我們直接用一張圖來表示整個大致流程: ![image-20201223100153241](https://tva1.sinaimg.cn/large/0081Kckwgy1glxk6y6zhjj31180jstcl.jpg) 具體的程式碼讀者自己翻閱,大致就是這麼個簡單過程。 ②:計算出pivot塊 `pivot`為`latestHeight - 64`,呼叫`splitAroundPivot`()方法以pivot為中心,將`results`分為三個部分:`beforeP`,`P`,`afterP`; ```go pivot := uint64(0) if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { pivot = height - uint64(fsMinFullBlocks) } ``` ```go P, beforeP, afterP := splitAroundPivot(pivot, results) ``` ③: 對`beforeP`的部分呼叫`commitFastSyncData`,將`body`和`receipt`都寫入區塊鏈 ```go d.commitFastSyncData(beforeP, sync); ``` ④:對**P**的部分更新狀態資訊為`P block`的狀態,把**P**對應的**result**(包含**body**和**receipt**)呼叫**commitPivotBlock**插入本地區塊鏈中,並呼叫**FastSyncCommitHead**記錄這個**pivot**的**hash**值,存在**downloader**中,標記為快速同步的最後一個區塊**hash**值; ```go if err := d.commitPivotBlock(P); err != nil { return err } ``` ⑤:對`afterP`呼叫`d.importBlockResults`,將`body`插入區塊鏈,而不插入`receipt`。因為是最後 64 個區塊,所以此時資料庫中只有`header`和`body`,沒有`receipt`和狀態,要通過`fullSync`模式進行最後的同步。 ```go if err := d.importBlockResults(afterP); err != nil { return err } ``` 到此為止整個Downloader同步完成了。 ## 參考 > https://mindcarver.cn > > https://github.com/ethereum/go-ethereum/pull/1889 > > https://yangzhe.me/2019/05/09/ethereum-downloader/#fet