1. 程式人生 > >以太坊原始碼分析(11)eth目前的共識演算法pow的整理

以太坊原始碼分析(11)eth目前的共識演算法pow的整理

### eth共識演算法分析,從本地節點挖到塊開始分析

##### 首先目前生產環境上面,肯定不是以CPU的形式挖礦的,那麼就是`remoteAgent`這種形式,也就是礦機通過網路請求從以太的節點獲取當前節點的出塊任務,然後礦機根據算出符合該塊難度hash值,提交給節點,也就是對應的以下方法.
```func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool {    a.mu.Lock()    defer a.mu.Unlock()
    // Make sure the work submitted is present    work := a.work[hash]    if work == nil {        log.Info("Work submitted but none pending", "hash", hash)        return false    }    // Make sure the Engine solutions is indeed valid    result := work.Block.Header()    result.Nonce = nonce    result.MixDigest = mixDigest
    if err := a.engine.VerifySeal(a.chain, result); err != nil {        log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err)        return false    }    block := work.Block.WithSeal(result)
    // Solutions seems to be valid, return to the miner and notify acceptance    a.returnCh <- &Result{work, block}    delete(a.work, hash)
    return true}
```
該方法會校驗提交過來的塊的hash難度,如果是正常的話,則會將該生成的塊寫到管道中,管道接收的方法在/miner/worker.go/Wait方法中
```func (self *worker) wait() {    for {        mustCommitNewWork := true        for result := range self.recv {            atomic.AddInt32(&self.atWork, -1)
            if result == nil {                continue            }            block := result.Block            work := result.Work
            // Update the block hash in all logs since it is now available and not when the            // receipt/log of individual transactions were created.            for _, r := range work.receipts {                for _, l := range r.Logs {                    l.BlockHash = block.Hash()                }            }            for _, log := range work.state.Logs() {                log.BlockHash = block.Hash()            }            stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state)            if err != nil {                log.Error("Failed writing block to chain", "err", err)                continue            }            // check if canon block and write transactions            if stat == core.CanonStatTy {                // implicit by posting ChainHeadEvent                mustCommitNewWork = false            }            // Broadcast the block and announce chain insertion event            // 通過p2p的形式將塊廣播到連線的節點,走的還是channel            self.mux.Post(core.NewMinedBlockEvent{Block: block})            var (                events []interface{}                logs = work.state.Logs()            )            events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})            if stat == core.CanonStatTy {                events = append(events, core.ChainHeadEvent{Block: block})            }            self.chain.PostChainEvents(events, logs)
            // Insert the block into the set of pending ones to wait for confirmations            self.unconfirmed.Insert(block.NumberU64(), block.Hash())
            if mustCommitNewWork {                self.commitNewWork()            }        }    }}
```
這裡傳送了一個新挖到塊的事件,接著跟,呼叫棧是```/geth/main.go/geth --> startNode --> utils.StartNode(stack)--> stack.Start() --> /node/node.go/Start() --> service.Start(running)--> /eth/backend.go/Start() --> /eth/handler.go/Start()
```好了核心邏輯在handler.go/Start()裡面
```func (pm *ProtocolManager) Start(maxPeers int) {    pm.maxPeers = maxPeers
    // broadcast transactions    // 廣播交易的通道。 txCh會作為txpool的TxPreEvent訂閱通道。txpool有了這種訊息會通知給這個txCh。 廣播交易的goroutine會把這個訊息廣播出去。    pm.txCh = make(chan core.TxPreEvent, txChanSize)    // 訂閱的回執    pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)    go pm.txBroadcastLoop()
    // 訂閱挖礦訊息。當新的Block被挖出來的時候會產生訊息。 這個訂閱和上面的那個訂閱採用了兩種不同的模式,這種是標記為Deprecated的訂閱方式。    // broadcast mined blocks    pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})    // 挖礦廣播 goroutine 當挖出來的時候需要儘快的廣播到網路上面去 本地挖出的塊通過這種形式廣播出去    go pm.minedBroadcastLoop()    // 同步器負責週期性地與網路同步,下載雜湊和塊以及處理通知處理程式。    // start sync handlers    go pm.syncer()    // txsyncLoop負責每個新連線的初始事務同步。 當新的peer出現時,我們轉發所有當前待處理的事務。 為了最小化出口頻寬使用,我們一次只發送一個小包。    go pm.txsyncLoop()}
````pm.minedBroadcastLoop()`
裡面就有管道接收到上面post出來的出塊訊息,跟進去將會看到通過p2p網路傳送給節點的邏輯
```// BroadcastBlock will either propagate a block to a subset of it's peers, or// will only announce it's availability (depending what's requested).func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {    hash := block.Hash()    peers := pm.peers.PeersWithoutBlock(hash)
    // If propagation is requested, send to a subset of the peer    if propagate {        // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)        var td *big.Int        if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {            td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))        } else {            log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)            return        }        // Send the block to a subset of our peers        transfer := peers[:int(math.Sqrt(float64(len(peers))))]        for _, peer := range transfer {            peer.SendNewBlock(block, td)        }        log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))        return    }    // Otherwise if the block is indeed in out own chain, announce it    if pm.blockchain.HasBlock(hash, block.NumberU64()) {        for _, peer := range peers {            peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})        }        log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))    }}```
這裡面會傳送兩種時間,一種是`NewBlockMsg`
,另外一種是`NewBlockHashesMsg`,好了到此本地節點挖到的塊就通過p2p網路的形式開始擴散出去了接著看下一個重要的方法
```// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable// with the ethereum network.func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {    // Create the protocol manager with the base fields    manager := &ProtocolManager{        networkId: networkId,        eventMux: mux,        txpool: txpool,        blockchain: blockchain,        chaindb: chaindb,        chainconfig: config,        peers: newPeerSet(),        newPeerCh: make(chan *peer),        noMorePeers: make(chan struct{}),        txsyncCh: make(chan *txsync),        quitSync: make(chan struct{}),    }    // Figure out whether to allow fast sync or not    if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {        log.Warn("Blockchain not empty, fast sync disabled")        mode = downloader.FullSync    }    if mode == downloader.FastSync {        manager.fastSync = uint32(1)    }    // Initiate a sub-protocol for every implemented version we can handle    manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))    for i, version := range ProtocolVersions {        // Skip protocol version if incompatible with the mode of operation        if mode == downloader.FastSync && version < eth63 {            continue        }        // Compatible; initialise the sub-protocol        version := version // Closure for the run        manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{            Name: ProtocolName,            Version: version,            Length: ProtocolLengths[i],            Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {                peer := manager.newPeer(int(version), p, rw)                select {                case manager.newPeerCh <- peer:                    manager.wg.Add(1)                    defer manager.wg.Done()                    return manager.handle(peer)                case <-manager.quitSync:                    return p2p.DiscQuitting                }            },            NodeInfo: func() interface{} {                return manager.NodeInfo()            },            PeerInfo: func(id discover.NodeID) interface{} {                if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {                    return p.Info()                }                return nil            },        })    }    if len(manager.SubProtocols) == 0 {        return nil, errIncompatibleConfig    }    // downloader是負責從其他的peer來同步自身資料。    // downloader是全鏈同步工具    // Construct the different synchronisation mechanisms    manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
    validator := func(header *types.Header) error {        return engine.VerifyHeader(blockchain, header, true)    }    heighter := func() uint64 {        return blockchain.CurrentBlock().NumberU64()    }    inserter := func(blocks types.Blocks) (int, error) {        // If fast sync is running, deny importing weird blocks        if atomic.LoadUint32(&manager.fastSync) == 1 {            log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())            return 0, nil        }        // 設定開始接收交易        atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import        return manager.blockchain.InsertChain(blocks)    }    // 生成一個fetcher    // Fetcher負責積累來自各個peer的區塊通知並安排進行檢索。    manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
    return manager, nil}
```該方法是用來管理以太坊協議下的多個子協議,其中的`Run`
方法在每個節點啟動的時候就會呼叫,可以看到是阻塞的,跟進`handler`方法能看到這樣的一塊關鍵程式碼
```for {        if err := pm.handleMsg(p); err != nil {            p.Log().Debug("Ethereum message handling failed", "err", err)            return err        }    }
```
死迴圈,處理p2p網路過來的訊息,接著看`handleMsg`方法
```func (pm *ProtocolManager) handleMsg(p *peer) error {    // Read the next message from the remote peer, and ensure it's fully consumed    msg, err := p.rw.ReadMsg()    if err != nil {        return err    }    if msg.Size > ProtocolMaxMsgSize {        return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)    }    defer msg.Discard()
    // Handle the message depending on its contents    switch {    case msg.Code == StatusMsg:        // Status messages should never arrive after the handshake        return errResp(ErrExtraStatusMsg, "uncontrolled status message")
    // Block header query, collect the requested headers and reply    case msg.Code == GetBlockHeadersMsg:        // Decode the complex header query        var query getBlockHeadersData        if err := msg.Decode(&query); err != nil {            return errResp(ErrDecode, "%v: %v", msg, err)        }        hashMode := query.Origin.Hash != (common.Hash{})
        // Gather headers until the fetch or network limits is reached        var (            bytes common.StorageSize            headers []*types.Header            unknown bool        )        for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {            // Retrieve the next header satisfying the query            var origin *types.Header            if hashMode {                origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)            } else {                origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)            }            if origin == nil {                break            }            number := origin.Number.Uint64()            headers = append(headers, origin)            bytes += estHeaderRlpSize
            // Advance to the next header of the query            switch {            case query.Origin.Hash != (common.Hash{}) && query.Reverse:                // Hash based traversal towards the genesis block                for i := 0; i < int(query.Skip)+1; i++ {                    if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {                        query.Origin.Hash = header.ParentHash                        number--                    } else {                        unknown = true                        break                    }                }            case query.Origin.Hash != (common.Hash{}) && !query.Reverse:                // Hash based traversal towards the leaf block                var (                    current = origin.Number.Uint64()                    next = current + query.Skip + 1                )                if next <= current {                    infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")                    p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)                    unknown = true                } else {                    if header := pm.blockchain.GetHeaderByNumber(next); header != nil {                        if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {                            query.Origin.Hash = header.Hash()                        } else {                            unknown = true                        }                    } else {                        unknown = true                    }                }            case query.Reverse:                // Number based traversal towards the genesis block                if query.Origin.Number >= query.Skip+1 {                    query.Origin.Number -= (query.Skip + 1)                } else {                    unknown = true                }
            case !query.Reverse:                // Number based traversal towards the leaf block                query.Origin.Number += (query.Skip + 1)            }        }        return p.SendBlockHeaders(headers)
    case msg.Code == BlockHeadersMsg:        // A batch of headers arrived to one of our previous requests        var headers []*types.Header        if err := msg.Decode(&headers); err != nil {            return errResp(ErrDecode, "msg %v: %v", msg, err)        }        // If no headers were received, but we're expending a DAO fork check, maybe it's that        if len(headers) == 0 && p.forkDrop != nil {            // Possibly an empty reply to the fork header checks, sanity check TDs            verifyDAO := true
            // If we already have a DAO header, we can check the peer's TD against it. If            // the peer's ahead of this, it too must have a reply to the DAO check            if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {                if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {                    verifyDAO = false                }            }            // If we're seemingly on the same chain, disable the drop timer            if verifyDAO {                p.Log().Debug("Seems to be on the same side of the DAO fork")                p.forkDrop.Stop()                p.forkDrop = nil                return nil            }        }        // Filter out any explicitly requested headers, deliver the rest to the downloader        filter := len(headers) == 1        if filter {            // If it's a potential DAO fork check, validate against the rules            if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {                // Disable the fork drop timer                p.forkDrop.Stop()                p.forkDrop = nil
                // Validate the header and either drop the peer or continue                if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {                    p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")                    return err                }                p.Log().Debug("Verified to be on the same side of the DAO fork")                return nil            }            // Irrelevant of the fork checks, send the header to the fetcher just in case            headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())        }        if len(headers) > 0 || !filter {            err := pm.downloader.DeliverHeaders(p.id, headers)            if err != nil {                log.Debug("Failed to deliver headers", "err", err)            }        }
    case msg.Code == GetBlockBodiesMsg:        // Decode the retrieval message        msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))        if _, err := msgStream.List(); err != nil {            return err        }        // Gather blocks until the fetch or network limits is reached        var (            hash common.Hash            bytes int            bodies []rlp.RawValue        )        for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {            // Retrieve the hash of the next block            if err := msgStream.Decode(&hash); err == rlp.EOL {                break            } else if err != nil {                return errResp(ErrDecode, "msg %v: %v", msg, err)            }            // Retrieve the requested block body, stopping if enough was found            if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {                bodies = append(bodies, data)                bytes += len(data)            }        }        return p.SendBlockBodiesRLP(bodies)
    case msg.Code == BlockBodiesMsg:        // A batch of block bodies arrived to one of our previous requests        var request blockBodiesData        if err := msg.Decode(&request); err != nil {            return errResp(ErrDecode, "msg %v: %v", msg, err)        }        // Deliver them all to the downloader for queuing        trasactions := make([][]*types.Transaction, len(request))        uncles := make([][]*types.Header, len(request))
        for i, body := range request {            trasactions[i] = body.Transactions            uncles[i] = body.Uncles        }        // Filter out any explicitly requested bodies, deliver the rest to the downloader        filter := len(trasactions) > 0 || len(uncles) > 0        if filter {            trasactions, uncles = pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now())        }        if len(trasactions) > 0 || len(uncles) > 0 || !filter {            err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)            if err != nil {                log.Debug("Failed to deliver bodies", "err", err)            }        }
    case p.version >= eth63 && msg.Code == GetNodeDataMsg:        // Decode the retrieval message        msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))        if _, err := msgStream.List(); err != nil {            return err        }        // Gather state data until the fetch or network limits is reached        var (            hash common.Hash            bytes int            data [][]byte        )        for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {            // Retrieve the hash of the next state entry            if err := msgStream.Decode(&hash); err == rlp.EOL {                break            } else if err != nil {                return errResp(ErrDecode, "msg %v: %v", msg, err)            }            // Retrieve the requested state entry, stopping if enough was found            if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil {                data = append(data, entry)                bytes += len(entry)            }        }        return p.SendNodeData(data)
    case p.version >= eth63 && msg.Code == NodeDataMsg:        // A batch of node state data arrived to one of our previous requests        var data [][]byte        if err := msg.Decode(&data); err != nil {            return errResp(ErrDecode, "msg %v: %v", msg, err)        }        // Deliver all to the downloader        if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {            log.Debug("Failed to deliver node state data", "err", err)        }
    case p.version >= eth63 && msg.Code == GetReceiptsMsg:        // Decode the retrieval message        msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))        if _, err := msgStream.List(); err != nil {            return err        }        // Gather state data until the fetch or network limits is reached        var (            hash common.Hash            bytes int            receipts []rlp.RawValue        )        for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {            // Retrieve the hash of the next block            if err := msgStream.Decode(&hash); err == rlp.EOL {                break            } else if err != nil {                return errResp(ErrDecode, "msg %v: %v", msg, err)            }            // Retrieve the requested block's receipts, skipping if unknown to us            results := core.GetBlockReceipts(pm.chaindb, hash, core.GetBlockNumber(pm.chaindb, hash))            if results == nil {                if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {                    continue                }            }            // If known, encode and queue for response packet            if encoded, err := rlp.EncodeToBytes(results); err != nil {                log.Error("Failed to encode receipt", "err", err)            } else {                receipts = append(receipts, encoded)                bytes += len(encoded)            }        }        return p.SendReceiptsRLP(receipts)
    case p.version >= eth63 && msg.Code == ReceiptsMsg:        // A batch of receipts arrived to one of our previous requests        var receipts [][]*types.Receipt        if err := msg.Decode(&receipts); err != nil {            return errResp(ErrDecode, "msg %v: %v", msg, err)        }        // Deliver all to the downloader        if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {            log.Debug("Failed to deliver receipts", "err", err)        }
    case msg.Code == NewBlockHashesMsg:        var announces newBlockHashesData        if err := msg.Decode(&announces); err != nil {            return errResp(ErrDecode, "%v: %v", msg, err)        }        // Mark the hashes as present at the remote node        for _, block := range announces {            p.MarkBlock(block.Hash)        }        // Schedule all the unknown hashes for retrieval        unknown := make(newBlockHashesData, 0, len(announces))        for _, block := range announces {            if !pm.blockchain.HasBlock(block.Hash, block.Number) {                unknown = append(unknown, block)            }        }        for _, block := range unknown {            pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)        }
    case msg.Code == NewBlockMsg:        // Retrieve and decode the propagated block        var request newBlockData        if err := msg.Decode(&request); err != nil {            return errResp(ErrDecode, "%v: %v", msg, err)        }        request.Block.ReceivedAt = msg.ReceivedAt        request.Block.ReceivedFrom = p
        // Mark the peer as owning the block and schedule it for import        p.MarkBlock(request.Block.Hash())        pm.fetcher.Enqueue(p.id, request.Block)
        // Assuming the block is importable by the peer, but possibly not yet done so,        // calculate the head hash and TD that the peer truly must have.        var (            trueHead = request.Block.ParentHash()            trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())        )        // Update the peers total difficulty if better than the previous        if _, td := p.Head(); trueTD.Cmp(td) > 0 {            p.SetHead(trueHead, trueTD)
            // Schedule a sync if above ours. Note, this will not fire a sync for a gap of            // a singe block (as the true TD is below the propagated block), however this            // scenario should easily be covered by the fetcher.            currentBlock := pm.blockchain.CurrentBlock()            if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {                go pm.synchronise(p)            }        }
    case msg.Code == TxMsg:        // Transactions arrived, make sure we have a valid and fresh chain to handle them        if atomic.LoadUint32(&pm.acceptTxs) == 0 {            break        }        // Transactions can be processed, parse all of them and deliver to the pool        var txs []*types.Transaction        if err := msg.Decode(&txs); err != nil {            return errResp(ErrDecode, "msg %v: %v", msg, err)        }        for i, tx := range txs {            // Validate and mark the remote transaction            if tx == nil {                return errResp(ErrDecode, "transaction %d is nil", i)            }            p.MarkTransaction(tx.Hash())        }        pm.txpool.AddRemotes(txs)
    default:        return errResp(ErrInvalidMsgCode, "%v", msg.Code)    }    return nil}```
該方法中就解碼了p2p網路過來的訊息,並且處理了`NewBlockMsg``NewBlockHashesMsg`這兩種事件,如`NewBlockMsg`中的處理邏輯是直接通過管道傳送到本地了,`pm.fetcher.Enqueue(p.id, request.Block)`,對應的管道名是:`f.inject`,其中是一個佇列,/fetcher.go/enqueue方法中寫入了一個FIFO佇列中
```func (f *Fetcher) enqueue(peer string, block *types.Block) {    hash := block.Hash()
    // Ensure the peer isn't DOSing us    count := f.queues[peer] + 1    if count > blockLimit {        log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)        propBroadcastDOSMeter.Mark(1)        f.forgetHash(hash)        return    }    // Discard any past or too distant blocks    if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {        log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)        propBroadcastDropMeter.Mark(1)        f.forgetHash(hash)        return    }    // Schedule the block for future importing    if _, ok := f.queued[hash]; !ok {        op := &inject{            origin: peer,            block: block,        }        f.queues[peer] = count        f.queued[hash] = op        f.queue.Push(op, -float32(block.NumberU64()))        if f.queueChangeHook != nil {            f.queueChangeHook(op.block.Hash(), true)        }        log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())    }}
```
該佇列的消費端在/fetcher.go/loop中,是一個死迴圈,核心程式碼
```for !f.queue.Empty() {            op := f.queue.PopItem().(*inject)            if f.queueChangeHook != nil {                f.queueChangeHook(op.block.Hash(), false)            }            // If too high up the chain or phase, continue later            number := op.block.NumberU64()            if number > height+1 {                f.queue.Push(op, -float32(op.block.NumberU64()))                if f.queueChangeHook != nil {                    f.queueChangeHook(op.block.Hash(), true)                }                break            }            // Otherwise if fresh and still unknown, try and import            hash := op.block.Hash()            if number+maxUncleDist < height || f.getBlock(hash) != nil {                f.forgetBlock(hash)                continue            }            f.insert(op.origin, op.block)        }
```
從佇列中取出,接著看`insert`方法
```func (f *Fetcher) insert(peer string, block *types.Block) {    hash := block.Hash()
    // Run the import on a new thread    log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)    go func() {        defer func() { f.done <- hash }()
        // If the parent's unknown, abort insertion        parent := f.getBlock(block.ParentHash())        if parent == nil {            log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())            return        }        // Quickly validate the header and propagate the block if it passes        switch err := f.verifyHeader(block.Header()); err {        case nil:            // All ok, quickly propagate to our peers            propBroadcastOutTimer.UpdateSince(block.ReceivedAt)            go f.broadcastBlock(block, true)
        case consensus.ErrFutureBlock:            // Weird future block, don't fail, but neither propagate
        default:            // Something went very wrong, drop the peer            log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)            f.dropPeer(peer)            return        }        // Run the actual import and log any issues        if _, err := f.insertChain(types.Blocks{block}); err != nil {            log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)            return        }        // If import succeeded, broadcast the block        propAnnounceOutTimer.UpdateSince(block.ReceivedAt)        go f.broadcastBlock(block, false)
        // Invoke the testing hook if needed        if f.importedHook != nil {            f.importedHook(block)        }    }()}

```
可以看到,該方法會呼叫`verifyHeader`方法去校驗區塊,如果沒問題的話就通過p2p的形式廣播出去,然後呼叫`insertChain`方法插入到本地的leveldb中,插入沒問題的話,會再廣播一次,不過這次只會廣播block的hash,如此,通過一個對等網路,只要塊合法,那麼就會被全網採納,其中的`verifyHeader`,`insertChain`方法都是在`/handler.go/NewProtocolManager`中定義傳過來的,所有啟動的邏輯都是`handler.go/Start`方法中.fetch.go的start方法在`syncer`方法中用一個單獨的協程觸發的
`/handler.go/handleMsg --> go pm.synchronise(p) --> pm.downloader.Synchronise(peer.id, pHead, pTd, mode) --> d.synchronise(id, head, td, mode)--> d.syncWithPeer(p, hash, td)`,讓我們看下核心方法
```func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {    d.mux.Post(StartEvent{})    defer func() {        // reset on error        if err != nil {            d.mux.Post(FailedEvent{err})        } else {            d.mux.Post(DoneEvent{})        }    }()    if p.version < 62 {        return errTooOld    }
    log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)    defer func(start time.Time) {        log.Debug("Synchronisation terminated", "elapsed", time.Since(start))    }(time.Now())
    // Look up the sync boundaries: the common ancestor and the target block    latest, err := d.fetchHeight(p)    if err != nil {        return err    }    height := latest.Number.Uint64()
    origin, err := d.findAncestor(p, height)    if err != nil {        return err    }    d.syncStatsLock.Lock()    if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {        d.syncStatsChainOrigin = origin    }    d.syncStatsChainHeight = height    d.syncStatsLock.Unlock()
    // Initiate the sync using a concurrent header and content retrieval algorithm    pivot := uint64(0)    switch d.mode {    case LightSync:        pivot = height    case FastSync:        // Calculate the new fast/slow sync pivot point        if d.fsPivotLock == nil {            pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))            if err != nil {                panic(fmt.Sprintf("Failed to access crypto random source: %v", err))            }            if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {                pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()            }        } else {            // Pivot point locked in, use this and do not pick a new one!            pivot = d.fsPivotLock.Number.Uint64()        }        // If the point is below the origin, move origin back to ensure state download        if pivot < origin {            if pivot > 0 {                origin = pivot - 1            } else {                origin = 0            }        }        log.Debug("Fast syncing until pivot block", "pivot", pivot)    }    d.queue.Prepare(origin+1, d.mode, pivot, latest)    if d.syncInitHook != nil {        d.syncInitHook(origin, height)    }
    fetchers := []func() error{        func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved        func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync        func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync        func() error { return d.processHeaders(origin+1, td) },    }    if d.mode == FastSync {        fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })    } else if d.mode == FullSync {        fetchers = append(fetchers, d.processFullSyncContent)    }    err = d.spawnSync(fetchers)    if err != nil && d.mode == FastSync && d.fsPivotLock != nil {        // If sync failed in the critical section, bump the fail counter.        atomic.AddUint32(&d.fsPivotFails, 1)    }    return err}```
由於上述整個呼叫棧是在`newBlockMsg`的條件中觸發的,這裡的`StartEvent`會通過通道的形式傳遞到miner.go/update中```func (self *Miner) update() {    events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})out:    for ev := range events.Chan() {        switch ev.Data.(type) {        case downloader.StartEvent:            atomic.StoreInt32(&self.canStart, 0)            if self.Mining() {                self.Stop()                atomic.StoreInt32(&self.shouldStart, 1)                log.Info("Mining aborted due to sync")            }        case downloader.DoneEvent, downloader.FailedEvent:            shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
            atomic.StoreInt32(&self.canStart, 1)            atomic.StoreInt32(&self.shouldStart, 0)            if shouldStart {                self.Start(self.coinbase)            }            // unsubscribe. we're only interested in this event once            events.Unsubscribe()            // stop immediately and ignore all further pending events            break out        }    }}
```
可以看到接收到這個`StartEvent`就會通知所有的代理,呼叫`stop`停止當前相同塊的挖礦,`remote_Agent`中的`stop`方法

最後再看一下新塊如何廣播給其他節點的,處理的方法在`/eth/handle.go/BroadcastBlock`
```func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {    hash := block.Hash()    peers := pm.peers.PeersWithoutBlock(hash)
    // If propagation is requested, send to a subset of the peer    if propagate {        // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)        var td *big.Int        if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {            td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))        } else {            log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)            return        }        // Send the block to a subset of our peers        transfer := peers[:int(math.Sqrt(float64(len(peers))))]        for _, peer := range transfer {            peer.SendNewBlock(block, td)        }        log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))        return    }    // Otherwise if the block is indeed in out own chain, announce it    if pm.blockchain.HasBlock(hash, block.NumberU64()) {        for _, peer := range peers {            peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})        }        log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))    }}
```

可以看到該方法中迴圈每個連線的peer節點,呼叫`peer.SendNewBlock`傳送產塊訊息過去
```func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {    p.knownBlocks.Add(block.Hash())    return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})}
```
```func Send(w MsgWriter, msgcode uint64, data interface{}) error {    size, r, err := rlp.EncodeToReader(data)    if err != nil {        return err    }    return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})}
```
可以看到通過`writeMsg`寫入該節點裡,該方法的實現是`rw *netWrapper) WriteMsg(msg Msg)`
```func (rw *netWrapper) WriteMsg(msg Msg) error {    rw.wmu.Lock()    defer rw.wmu.Unlock()    rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout))    return rw.wrapped.WriteMsg(msg)}
```
該方法設定了一個超時時間,底層呼叫了net.go的`Write(b []byte) (n int, err error)`,通過網路寫給對應的節點了,然後接收端的方法為`ReadMsg`
```func (pm *ProtocolManager) handleMsg(p *peer) error {    // Read the next message from the remote peer, and ensure it's fully consumed    msg, err := p.rw.ReadMsg()    if err != nil {        return err    }    if msg.Size > ProtocolMaxMsgSize {        return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)    }    defer msg.Discard()

```
可以看到在這邊讀取網路寫入來的訊息,然後根據不同的`msgCode`作不同的處理,由於`handleMsg`是在一個死迴圈中呼叫的,所以就能一直接收到節點廣播過來的訊息
```//eth/handler.gofunc (pm *ProtocolManager) handle(p *peer) error { td, head, genesis := pm.blockchain.Status() p.Handshake(pm.networkId, td, head, genesis)
if rw, ok := p.rw.(*meteredMsgReadWriter); ok { rm.Init(p.version) }
pm.peers.Register(p) defer pm.removePeer(p.id)
pm.downloader.RegisterPeer(p.id, p.version, p)
pm.syncTransactions(p) ... for { if err := pm.handleMsg(p); err != nil { return err } }}

```
handle()函式針對一個新peer做了如下幾件事:握手,與對方peer溝通己方的區塊鏈狀態初始化一個讀寫通道,用以跟對方peer相互資料傳輸。註冊對方peer,存入己方peer列表;只有handle()函式退出時,才會將這個peer移除出列表。Downloader成員註冊這個新peer;Downloader會自己維護一個相鄰peer列表。呼叫syncTransactions(),用當前txpool中新累計的tx物件組裝成一個txsync{}物件,推送到內部通道txsyncCh。還記得Start()啟動的四個函式麼?其中第四項txsyncLoop()中用以等待txsync{}資料的通道txsyncCh,正是在這裡被推入txsync{}的。在無限迴圈中啟動handleMsg(),當對方peer發出任何msg時,handleMsg()可以捕捉相應型別的訊息並在己方進行處理。