1. 程式人生 > >死磕以太坊原始碼分析之Fetcher同步

死磕以太坊原始碼分析之Fetcher同步

> 死磕以太坊原始碼分析之Fetcher同步 ## Fetcher 功能概述 區塊資料同步分為被動同步和主動同步: - 被動同步是指本地節點收到其他節點的一些**廣播**的訊息,然後請求區塊資訊。 - 主動同步是指節點主動向其他節點請求區塊資料,比如geth剛啟動時的syning,以及執行時定時和相鄰節點同步 `Fetcher`負責被動同步,主要做以下事情: - 收到完整的block廣播訊息(NewBlockMsg) - 收到blockhash廣播訊息(NewBlockHashesMsg) 這兩個訊息又是分別由 `peer.AsyncSendNewBlockHash` 和 `peer.AsyncSendNewBlock` 兩個方法發出的,這兩個方法只有在礦工挖到新的區塊時才會被呼叫: ```go // 訂閱本地挖到新的區塊的訊息 func (pm *ProtocolManager) minedBroadcastLoop() { for obj := range pm.minedBlockSub.Chan() { if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { pm.BroadcastBlock(ev.Block, true) // First propagate block to peers pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest } } } ``` ```go func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { ...... if propagate { ...... for _, peer := range transfer { peer.AsyncSendNewBlock(block, td) //傳送區塊資料 } } if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.AsyncSendNewBlockHash(block) //傳送區塊雜湊 } } } ``` 所以,當某個礦工產生了新的區塊、並將這個新區塊廣播給其它節點,而**其它遠端節點**收到廣播的訊息時,才會用到 `fetcher` 模組去同步這些區塊。 ------- ## fetcher的狀態欄位 在 `Fetcher` 內部對區塊進行同步時,會被分成如下幾個階段,並且每個階段都有一個狀態欄位與之對應,用來記錄這個階段的資料: - `Fetcher.announced`:此階段代表節點宣稱產生了新的區塊(這個新產生的區塊不一定是自己產生的,也可能是同步了其它節點新產生的區塊),`Fetcher` 物件將相關資訊放到 `Fetcher.announced` 中,等待下載。 - `Fetcher.fetching`:此階段代表之前「announced」的區塊正在被下載。 - `Fetcher.fetched`:代表區塊的 `header` 已下載成功,現在等待下載 `body`。 - `Fetcher.completing`:代表 `body` 已經發起了下載,正在等待 `body` 下載成功。 - `Fetcher.queued`:代表 `body` 已經下載成功。因此一個區塊的資料:`header` 和 body 都已下載完成,此區塊正在等待寫入本地資料庫。 ## Fetcher 同步區塊雜湊 而新產生區塊時,會使用訊息 `NewBlockHashesMsg` 和 `NewBlockMsg` 對其進行傳播。因此 `Fetcher` 物件也是從這兩個訊息處發現新的區塊資訊的。先來看同步區塊雜湊的過程。 ```go case msg.Code == NewBlockHashesMsg: var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } // Mark the hashes as present at the remote node // 將hash 標記存在於遠端節點上 for _, block := range announces { p.MarkBlock(block.Hash) } // Schedule all the unknown hashes for retrieval 檢索所有未知雜湊 unknown := make(newBlockHashesData, 0, len(announces)) for _, block := range announces { if !pm.blockchain.HasBlock(block.Hash, block.Number) { unknown = append(unknown, block) // 本地不存在的話就扔到unkonwn裡面 } } for _, block := range unknown { pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) } ``` 先將接收的雜湊標記在遠端節點上,然後去本地檢索是否有這個雜湊,如果本地資料庫不存在的話,就放到`unknown`裡面,然後通知本地的`fetcher`模組再去遠端節點上請求此區塊的`header`和`body`。 接下來進入到`fetcher.Notify`方法中。 ```GO func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error { block := &announce{ hash: hash, number: number, time: time, origin: peer, fetchHeader: headerFetcher, fetchBodies: bodyFetcher, } select { case f.notify <- block: return nil case <-f.quit: return errTerminated } ``` 它構造了一個 `announce` 結構,並將其傳送給了 `Fetcher.notify` 這個 channel。注意 `announce` 這個結構裡帶著下載 header 和 body 的方法: `fetchHeader` 和 `fetchBodies` 。這兩個方法在下面的過程中會講到。 接下來我們進入到`fetcher.go`的loop函式中,找到`notify`,分以下幾個內容: ①:校驗防止Dos攻擊(限制為256個) ```go count := f.announces[notification.origin] + 1 if count > hashLimit { log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit) propAnnounceDOSMeter.Mark(1) break } ``` ②:新來的塊號必須滿足 $chainHeight - blockno < 7$ 或者 $blockno - chainHeight < 32$ ```go if notification.number > 0 { if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { ... } } ``` ③:準備下載`header`的`fetching`中存在此雜湊則跳過 ```GO if _, ok := f.fetching[notification.hash]; ok { break } ``` ④:準備下載`body`的`completing`中存在此雜湊也跳過 ```go if _, ok := f.completing[notification.hash]; ok { break } ``` ⑤:當確定`fetching`和`completing`不存在此區塊雜湊時,則把此區塊雜湊放入到`announced`中,準備拉取`header`和`body`。 ```GO f.announced[notification.hash] = append(f.announced[notification.hash], notification) ``` ⑥:如果 `Fetcher.announced` 中只有剛才新加入的這一個區塊雜湊,那麼呼叫 `Fetcher.rescheduleFetch` 重新設定變數 `fetchTimer` 的週期 ```go if len(f.announced) == 1 { f.rescheduleFetch(fetchTimer) } ``` ### 拉取header 接下來就是到`fetchTimer.C`函式中:進行拉取header的操作了,具體步驟如下: ①:選擇要下載的區塊,從 `announced` 轉移到 `fetching` 中 ```GO for hash, announces := range f.announced { if time.Since(announces[0].time) > arriveTimeout-gatherSlack { // 隨機挑一個進行fetching announce := announces[rand.Intn(len(announces))] f.forgetHash(hash) // If the block still didn't arrive, queue for fetching if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.fetching[hash] = announce // } } } ``` ②:傳送下載 `header` 的請求 ```GO //傳送所有的header請求 for peer, hashes := range request { log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes) fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes go func() { if f.fetchingHook != nil { f.fetchingHook(hashes) } for _, hash := range hashes { headerFetchMeter.Mark(1) fetchHeader(hash) } }() } ``` 現在我們再回到`f.notify`函式中,找到`p.RequestOneHeader`,傳送`GetBlockHeadersMsg`給遠端節點,然後遠端節點再通過`case msg.Code == GetBlockHeadersMsg`進行處理,本地區塊鏈會返回headers,然後再發送回去。 ```GO origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) ... p.SendBlockHeaders(headers) ``` 這時候我們請求的`headers`被遠端節點給傳送回來了,又是通過新的訊息`BlockHeadersMsg`來傳遞的,當請求的 `header` 到來時,會通過兩種方式來過濾header : 1. `Fetcher.FilterHeaders` 通知 `Fetcher` 物件 ```go case msg.Code == BlockHeadersMsg: .... filter := len(headers) == 1 if filter { headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) } ``` 2.`downloader.DeliverHeaders` 通知`downloader`物件 ```go if len(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) ... } ``` `downloader`相關的放在接下的文章探討。繼續看`FilterHeaders`: ```go filter := make(chan *headerFilterTask) select { case f.headerFilter <- filter: ① .... select { case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: ② ... select { case task := <-filter: ③ return task.headers ... } ``` 主要分為3個步驟: 1. 先發一個通訊用的 `channel` 給 `headerFilter` 2. 將要過濾的 `headerFilterTask` 傳送給 `filter` 3. 檢索過濾後剩餘的標題 主要的處理步驟還是在`loop`函式中的`filter := <-f.headerFilter`,在探討處理前,先了解三個引數的含義: - `unknown:`未知的header - `incomplete:`**header**拉取完成,但是**body**還沒有拉取 - `complete:`**header**和**body**都拉取完成,一個完整的塊,可匯入到資料庫 接下來正式進入到`for _, header := range task.headers {}`迴圈中: **這是第一段重要的迴圈** ①:判斷是否是在`fetching`中的header,並且不是其他同步演算法的`header` ```go if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { ..... } ``` ②:如果傳遞的`header`與承諾的`number`不匹配,刪除`peer` ```go if header.Number.Uint64() != announce.number { f.dropPeer(announce.origin) f.forgetHash(hash) } ``` ③:判斷此區塊在本地是否已存在,如果不存在且只有`header`(空塊),直接放入`complete`以及`f.completing`中,否則就放入到`incomplete`中等待同步`body`。 ```go if f.getBlock(hash) == nil { announce.header = header announce.time = task.time if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { ... block := types.NewBlockWithHeader(header) block.ReceivedAt = task.time complete = append(complete, block) f.completing[hash] = announce continue } incomplete = append(incomplete, announce) // 否則新增到需要完成拉取body的列表中 ``` ④:如果`f.fetching`中不存在此雜湊,就放入到`unkown`中 ```go else { // Fetcher doesn't know about it, add to the return list |fetcher 不認識的放到unkown中 unknown = append(unknown, header) } ``` ⑤:之後再把`Unknown`的`header`再通知fetcher繼續過濾 ```go select { case filter <- &headerFilterTask{headers: unknown, time: task.time}: case <-f.quit: return } ``` 接著就是**進入到第二個迴圈**,要準備拿出incomplete裡的雜湊,進行同步body的同步 ```go for _, announce := range incomplete { hash := announce.header.Hash() if _, ok := f.completing[hash]; ok { continue } f.fetched[hash] = append(f.fetched[hash], announce) if len(f.fetched) == 1 { f.rescheduleComplete(completeTimer) } } ``` 如果`f.completing`中存在,就表明已經在開始同步`body`了,直接跳過,否則把這個雜湊放入到`f.fetched`,表示`header`同步完畢,準備`body`同步,由`f.rescheduleComplete(completeTimer)`完成。最後是安排只有`header`的區塊進行匯入操作. ```GO for _, block := range complete { if announce := f.completing[block.Hash()]; announce != nil { f.enqueue(announce.origin, block) } } ``` 重點分析`completeTimer.C`,同步`body`的操作,這步完成就是要準備區塊匯入到資料庫流程了。 ### 拉取body 進入`completeTimer.C`,從f.fetched獲取雜湊,如果本地區塊鏈查不到的話就把這個雜湊放入到`f.completing`中,再迴圈進行`fetchBodies`,整個流程就結束了,程式碼大致如下: ```go case <-completeTimer.C: ... for hash, announces := range f.fetched { .... if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.completing[hash] = announce } } for peer, hashes := range request { ... go f.completing[hashes[0]].fetchBodies(hashes) } ... ``` 關鍵的拉取`body`函式: `p.RequestBodies`,傳送`GetBlockBodiesMsg`訊息同步`body`。回到`handler`裡面去檢視對應的訊息: ```go case msg.Code == GetBlockBodiesMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { return err } var ( hash common.Hash bytes int bodies []rlp.RawValue ) for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { ... if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 { bodies = append(bodies, data) bytes += len(data) } } return p.SendBlockBodiesRLP(bodies) ``` `softResponseLimit`返回的`body`大小最大為$2 * 1024 * 1024$,`MaxBlockFetch`表示每個請求最多128個`body`。 之後直接通過`GetBodyRLP`返回資料通過`SendBlockBodiesRLP`發回給節點。 節點將會接收到新訊息:`BlockBodiesMsg`,進入檢視: ```go // 過濾掉filter請求的body 同步,其他的都交給downloader filter := len(transactions) > 0 || len(uncles) > 0 if filter { transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) } if len(transactions) > 0 || len(uncles) > 0 || !filter { err := pm.downloader.DeliverBodies(p.id, transactions, uncles) ... } ``` 過濾掉`filter`請求的`body` 同步,其他的都交給`downloader`,`downloader`部分之後的篇章講。進入到`FilterBodies`: ```go filter := make(chan *bodyFilterTask) select { case f.bodyFilter <- filter: ① case <-f.quit: return nil, nil } // Request the filtering of the body list // 請求過濾body 列表 select { ② case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}: case <-f.quit: return nil, nil } // Retrieve the bodies remaining after filtering select { ③: case task := <-filter: return task.transactions, task.uncles ``` 主要分為3個步驟: 1. 先發一個通訊用的 `channel` 給 `bodyFilter` 2. 將要過濾的 `bodyFilterTask` 傳送給 `filter` 3. 檢索過濾後剩餘的`body` 現在進入到`case filter := <-f.bodyFilter`裡面,大致做了以下幾件事: ①:首先從f.completing中獲取要同步body的雜湊 ```go for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ { for hash, announce := range f.completing { ... } } ``` ②:然後從f.queued去查這個雜湊是不是已經獲取了body,如果沒有並滿足條件就建立一個完整block ```go if f.queued[hash] == nil { txnHash := types.DeriveSha(types.Transactions(task.transactions[i])) uncleHash := types.CalcUncleHash(task.uncles[i]) if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer { matched = true if f.getBlock(hash) == nil { block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) block.ReceivedAt = task.time blocks = append(blocks, block) } } ``` ③:最後對完整的塊進行匯入 ```go for _, block := range blocks { if announce := f.completing[block.Hash()]; announce != nil { f.enqueue(announce.origin, block) } } ``` 最後用一張粗略的圖來大概的描述一下整個同步區塊雜湊的流程: ![image-20201203090304059](https://tva1.sinaimg.cn/large/0081Kckwgy1glae3zccpdj30wm0t6q67.jpg) ---------- 同步區塊雜湊的最終會走到`f.enqueue`裡面,這個也是**同步區塊**最重要的要做的一件事,下文就會講到。 ## Fetcher 同步區塊 分析完上面比較複雜的同步區塊雜湊過程,接下來就要分析比較簡單的同步區塊過程。從`NewBlockMsg`開始: 主要做兩件事: ①:`fetcher`模組匯入遠端節點發過來的區塊 ```GO pm.fetcher.Enqueue(p.id, request.Block) ``` ②:**主動同步**遠端節點 ```GO if _, td := p.Head(); trueTD.Cmp(td) > 0 { p.SetHead(trueHead, trueTD) currentBlock := pm.blockchain.CurrentBlock() if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { go pm.synchronise(p) } } ``` 主動同步由`Downloader`去處理,我們這篇只討論`fetcher`相關。 ### 區塊入佇列 ```go pm.fetcher.Enqueue(p.id, request.Block) ``` ```go case op := <-f.inject: propBroadcastInMeter.Mark(1) f.enqueue(op.origin, op.block) ``` 正式進入將區塊送進`queue`中,主要做了以下幾件事: ①: 確保新加`peer`沒有導致`DOS`攻擊 ```go count := f.queues[peer] + 1 if count > blockLimit { log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } ``` ②:丟棄掉過去的和比較老的區塊 ```go if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { f.forgetHash(hash) } ``` ③:安排區塊匯入 ```go if _, ok := f.queued[hash]; !ok { op := &inject{ origin: peer, block: block, } f.queues[peer] = count f.queued[hash] = op f.queue.Push(op, -int64(block.NumberU64())) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size()) } ``` 到此為止,已經將區塊送入到`queue`中,接下來就是要回到`loop`函式中去處理`queue`中的區塊。 ### 區塊入庫 loop函式在處理佇列中的區塊主要做了以下事情: 1. 判斷佇列是否為空 2. 取出區塊雜湊,並且和本地鏈進行比較,如果太高的話,就暫時不匯入 3. 最後通過f.insert將區塊插入到資料庫。 程式碼如下: ```GO height := f.chainHeight() for !f.queue.Empty() { op := f.queue.PopItem().(*inject) hash := op.block.Hash() ... number := op.block.NumberU64() if number > height+1 { f.queue.Push(op, -int64(number)) ... break } if number+maxUncleDist < height || f.getBlock(hash) != nil { f.forgetBlock(hash) continue } f.insert(op.origin, op.block) //匯入塊 } ``` 進入到`f.insert`中,主要做了以下幾件事: ①:判斷區塊的父塊是否存在,不存在則中斷插入 ```go parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) return } ``` ②: 快速驗證header,**並在傳遞時廣播該塊** ```go switch err := f.verifyHeader(block.Header()); err { case nil: propBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true) ``` ③:執行真正的插入邏輯 ```GO if _, err := f.insertChain(types.Blocks{block}); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } ``` ④:匯入成功廣播此塊 ```go go f.broadcastBlock(block, false) ``` 真正做區塊入庫的是f.insertChain,這裡會呼叫blockchain模組去操作,具體細節會後續文章講述,到此為止Fether模組的同步就到此結束了,下面是同步區塊的流程圖: ![image-20201203090327173](https://tva1.sinaimg.cn/large/0081Kckwgy1glae40anr7j30ps0mkn0t.jpg) ------- ## 參考 > https://mindcarver.cn > > https://github.com/blockcha