1. 程式人生 > >【我的區塊鏈之路】- 以太坊原始碼剖析之Geth 1.8.14版本挖礦邏輯調整

【我的區塊鏈之路】- 以太坊原始碼剖析之Geth 1.8.14版本挖礦邏輯調整

今天為什麼寫這個文章呢,首先,前段時間有朋友問過我,說現在geth的1.8.14版本的程式碼和網上各路大神們的分析不一樣了。我就趕緊看了下,確實,親的geth程式碼中的mine部分的邏輯有所改動,想必看過原始碼的都知道,之前的miner真正挖礦是由worker把所需挖礦的內容全部封裝成Work再交由Agent去操作的,而新版本的mining邏輯是移除了Agent全部由worker自己去操作了,worker起了四個goroutine去做這些事。那麼,對於不清楚老版本mining邏輯的朋友也不要怕,我都會一一道來。首先,我們先看看老版本的mining是怎麼回事:

在老程式碼中miner包主要由miner.go

 worker.go agent.go 三個檔案組成 【以下圖是網上扣的哦】

  • Miner 負責與外部互動和高層次的挖礦控制
  • worker 負責低層次的挖礦控制 管理下屬所有Agent
  • Agent 負責實際的挖礦計算工作

三者之間的頂層聯絡如下圖所示

在老版本中我們可以知道,挖礦的入口是miner;

當例項化一個miner例項時,順便做了幾件事,一例項化了worker和註冊了Agent例項CpuAgent並且啟動了一個 update函式去做事件的監聽【啟動挖礦 或 停止挖礦】

miner的update:

這個update()會訂閱(監聽)幾種事件,均跟Downloader相關

。【1】當收到Downloader的StartEvent時,意味者此時本節點正在從其他節點下載新區塊,這時miner會立即停止進行中的挖掘工作,並繼續監聽;【2】如果收到DoneEvent或FailEvent時,意味本節點的下載任務已結束-無論下載成功或失敗-此時都可以開始挖掘新區塊,並且此時會退出Downloader事件的監聽

我們再來看看在例項化一個miner的時候順便例項化的worker是做了些什麼事情【即在miner的New函式中所呼叫了worker的newWorker函式】注意:此時,CpuAgent還未啟動

worker的newWorker:

可以看出,在例項化了worker物件後,分別註冊了3個監聽調,而真正實時在監聽的是在新起的協程 go worker.update() 中所做的事【待會再說】。及啟動了協程 go worker.wait() 這個主要做把接收到挖好礦的Block上鍊的;及呼叫了一個 worker.commitNewWork() 把需要挖礦的資訊都打包成Work 物件傳遞給CpuAgent。

go worker.update() :

func (self *worker) update() {
	defer self.txsSub.Unsubscribe()
	defer self.chainHeadSub.Unsubscribe()
	defer self.chainSideSub.Unsubscribe()

	for {
		// A real event arrived, process interesting content
		select {
		// Handle ChainHeadEvent
		case <-self.chainHeadCh:
			self.commitNewWork()

		// Handle ChainSideEvent
		case ev := <-self.chainSideCh:
			self.uncleMu.Lock()
			self.possibleUncles[ev.Block.Hash()] = ev.Block
			self.uncleMu.Unlock()

		// Handle NewTxsEvent
		case ev := <-self.txsCh:
			// Apply transactions to the pending state if we're not mining.
			//
			// Note all transactions received may not be continuous with transactions
			// already included in the current mining block. These transactions will
			// be automatically eliminated.
			if atomic.LoadInt32(&self.mining) == 0 {
				self.currentMu.Lock()
				txs := make(map[common.Address]types.Transactions)
				for _, tx := range ev.Txs {
					acc, _ := types.Sender(self.current.signer, tx)
					txs[acc] = append(txs[acc], tx)
				}
				txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
				self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
				self.updateSnapshot()
				self.currentMu.Unlock()
			} else {
				// If we're mining, but nothing is being processed, wake on new transactions
				if self.config.Clique != nil && self.config.Clique.Period == 0 {
					self.commitNewWork()
				}
			}

		// System stopped
		case <-self.txsSub.Err():
			return
		case <-self.chainHeadSub.Err():
			return
		case <-self.chainSideSub.Err():
			return
		}
	}
}

worker.update()分別監聽ChainHeadEventChainSideEventTxPreEvent 幾個事件,每個事件會觸發worker不同的反應。ChainHeadEvent是指區塊鏈中已經加入了一個新的區塊作為整個鏈的鏈頭,這時worker的迴應是立即開始準備挖掘下一個新區塊(也是夠忙的);ChainSideEvent指區塊鏈中加入了一個新區塊作為當前鏈頭的旁支,worker會把這個區塊收納進possibleUncles[]陣列,作為下一個挖掘新區塊可能的Uncle之一;TxPreEvent是TxPool物件發出的,指的是一個新的交易tx被加入了TxPool,這時如果worker沒有處於挖掘中,那麼就去執行這個tx,並把它收納進Work.txs陣列,為下次挖掘新區塊備用。

【注意】:ChainHeadEvent並不一定是外部源發出。由於worker物件有個成員變數chain(eth.BlockChain),所以當worker自己完成挖掘一個新區塊,並把它寫入資料庫,加進區塊鏈裡成為新的鏈頭時,worker自己也可以呼叫chain發出一個ChainHeadEvent,從而被worker.update()函式監聽到,進入下一次區塊挖掘。

go worker.wait():

func (self *worker) wait() {
	for {
		for result := range self.recv {
			atomic.AddInt32(&self.atWork, -1)

			if result == nil {
				continue
			}
			block := result.Block
			work := result.Work

			// Update the block hash in all logs since it is now available and not when the
			// receipt/log of individual transactions were created.
			for _, r := range work.receipts {
				for _, l := range r.Logs {
					l.BlockHash = block.Hash()
				}
			}
			for _, log := range work.state.Logs() {
				log.BlockHash = block.Hash()
			}
			self.currentMu.Lock()
			stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
			self.currentMu.Unlock()
			if err != nil {
				log.Error("Failed writing block to chain", "err", err)
				continue
			}
			// Broadcast the block and announce chain insertion event
			self.mux.Post(core.NewMinedBlockEvent{Block: block})
			var (
				events []interface{}
				logs   = work.state.Logs()
			)
			events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
			if stat == core.CanonStatTy {
				events = append(events, core.ChainHeadEvent{Block: block})
			}
			self.chain.PostChainEvents(events, logs)

			// Insert the block into the set of pending ones to wait for confirmations
			self.unconfirmed.Insert(block.NumberU64(), block.Hash())
		}
	}
}

worker.wait()會在一個channel處一直等待Agent完成挖掘傳送回來的新BlockWork物件。這個Block會被寫入資料庫,加入本地的區塊鏈試圖成為最新的鏈頭。而Wotk裡頭其實是各種收據資訊,最終也會寫入 reciept樹;注意,此時區塊中的所有交易,假設都已經被執行過了,所以這裡的操作,不會再去執行這些交易物件。

當這一切都完成,worker就會發送一條事件(NewMinedBlockEvent{}),等於通告天下:我挖出了一個新區塊!這樣監聽到該事件的其他節點,就會根據自身的狀況,來決定是否接受這個新區塊成為全網中公認的區塊鏈新的鏈頭。至於這個公認過程如何實現,就屬於共識演算法的範疇了。而傳送NewMinedBlockEvent 事件的動作是在該方法最底下的 一個函式呼叫中實現的 self.chain.PostChainEvents(events, logs)

我們再來看看 worker.commitNewWork():

他裡頭所做的事無非就是把需要打包成Block的一些相關資訊先封裝到Work物件中,其中裡頭有涉及到DAO硬分叉的特殊處理和把當前狀態置為最後狀態封裝到Work中的呼叫:

【注意】在挖礦過程中主要涉及Prepare() Finalize() Seal() 介面,三者的職責分別為
Prepare() 初始化新Block的Header
Finalize() 在執行完交易後,對Block進行修改(比如向礦工發放挖礦所得)
Seal() 實際的挖礦工作

最後通過push方法把Work傳遞給CpuAgent:

【注意】:commitNewWork()會在worker內部多處被呼叫,注意它每次都是被直接呼叫,並沒有以goroutine的方式啟動。commitNewWork()內部使用sync.Mutex對全部操作做了隔離

該函式主要做了以下操作:

  1. 準備新區塊的時間屬性Header.Time,一般均等於系統當前時間,不過要確保父區塊的時間(parentBlock.Time())要早於新區塊的時間,父區塊當然來自當前區塊鏈的鏈頭了。
  2. 建立新區塊的Header物件,其各屬性中:Num可確定(父區塊Num +1);Time可確定;ParentHash可確定;其餘諸如Difficulty,GasLimit等,均留待之後共識演算法中確定。
  3. 呼叫Engine.Prepare()函式,完成Header物件的準備。
  4. 根據新區塊的位置(Number),檢視它是否處於DAO硬分叉的影響範圍內,如果是,則賦值予header.Extra。
  5. 根據已有的Header物件,建立一個新的Work物件,並用其更新worker.current成員變數。
  6. 如果配置資訊中支援硬分叉,在Work物件的StateDB裡應用硬分叉。
  7. 準備新區塊的交易列表,來源是TxPool中那些最近加入的tx,並執行這些交易
  8. 準備新區塊的叔區塊uncles[],來源是worker.possibleUncles[],而possibleUncles[]中的每個區塊都從事件ChainSideEvent中搜集得到。注意叔區塊最多有兩個。
  9. 呼叫Engine.Finalize()函式,對新區塊“定型”,填充上Header.Root, TxHash, ReceiptHash, UncleHash等幾個屬性。
  10. 如果上一個區塊(即舊的鏈頭區塊)處於unconfirmedBlocks中,意味著它也是由本節點挖掘出來的,嘗試去驗證它已經被吸納進主幹鏈中。
  11. 把建立的Work物件,通過channel傳送給每一個登記過的Agent,進行後續的挖掘。

以上步驟中,4和6都是僅僅在該區塊配置中支援DAO硬分叉,並且該區塊的位置正好處於DAO硬分叉影響範圍內時才會發生;其他步驟是普遍性的。commitNewWork()完成了待挖掘區塊的組裝,block.Header建立完畢,交易陣列txs,叔區塊Uncles[]都已取得,並且由於所有交易被執行完畢,相應的Receipt[]也已獲得。萬事俱備,可以交給Agent進行‘挖掘’了。

我們再看看worker.push():

// push sends a new work task to currently live miner agents.
func (self *worker) push(work *Work) {
	if atomic.LoadInt32(&self.mining) != 1 {
		return
	}
	for agent := range self.agents {
		atomic.AddInt32(&self.atWork, 1)
		if ch := agent.Work(); ch != nil {
			ch <- work
		}
	}
}

就是這樣紙把封裝好的Work物件傳遞給了CpuAgent;

我們在往下看CpuAgent都做了什麼事:

首先我們會看到外層的miner的start函式會呼叫worker的start函式其實是呼叫了CpuAgent的start函式,而在CpuAgent的start中我們看到只調用了自己的update函式而已。

func (self *CpuAgent) Start() {
	if !atomic.CompareAndSwapInt32(&self.isMining, 0, 1) {
		return // agent already started
	}
	go self.update()
}

我們再看看Agent的update:

func (self *CpuAgent) update() {
out:
	for {
		select {
		case work := <-self.workCh:
			self.mu.Lock()
			if self.quitCurrentOp != nil {
				close(self.quitCurrentOp)
			}
			self.quitCurrentOp = make(chan struct{})
			go self.mine(work, self.quitCurrentOp)
			self.mu.Unlock()
		case <-self.stop:
			self.mu.Lock()
			if self.quitCurrentOp != nil {
				close(self.quitCurrentOp)
				self.quitCurrentOp = nil
			}
			self.mu.Unlock()
			break out
		}
	}
}

很顯然,裡頭只做了一直在監聽由worker發過來的Work物件和是否停止挖礦的訊號;如果間聽到了Work物件則啟動一非同步的mine函式去做挖礦動作。

我們再看Agent的mine函式:

func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
	if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
		log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
		self.returnCh <- &Result{work, result}
	} else {
		if err != nil {
			log.Warn("Block sealing failed", "err", err)
		}
		self.returnCh <- nil
	}
}

會看到其實真正的挖礦是交由engine的實現類去做底層共識進行挖礦的【說白了就是求目標Hash】並且把打包好的Block和Work都組裝到Result中返回給worker。而上面我們說了worker會在worker.wait()中實時監聽返回的Result,分別提取出Block和Work去做區塊上鍊和時間廣播等操作。OK,那麼這就是老版本的挖礦邏輯;下面我們來看看新版本的挖礦邏輯是個什麼鬼;

由上面我們說了新版本,如:1.8.14已經把Agent移除了,所有挖礦的邏輯都在worker中完成,下面我們從miner入口來看看本次的變動:

我們可以看到在新版的miner的New函式中已經把Agent註冊的邏輯移除了

func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration) *Miner {
	miner := &Miner{
		eth:      eth,
		mux:      mux,
		engine:   engine,
		exitCh:   make(chan struct{}),
		worker:   newWorker(config, engine, eth, mux, recommit),
		canStart: 1,
	}
	go miner.update()

	return miner
}

我們直接過來看newWorker函式中的實現:

func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration) *worker {
	worker := &worker{
		config:             config,
		engine:             engine,
		eth:                eth,
		mux:                mux,
		chain:              eth.BlockChain(),
		possibleUncles:     make(map[common.Hash]*types.Block),
		unconfirmed:        newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
		txsCh:              make(chan core.NewTxsEvent, txChanSize),
		chainHeadCh:        make(chan core.ChainHeadEvent, chainHeadChanSize),
		chainSideCh:        make(chan core.ChainSideEvent, chainSideChanSize),
		newWorkCh:          make(chan *newWorkReq),
		taskCh:             make(chan *task),
		resultCh:           make(chan *task, resultQueueSize),
		exitCh:             make(chan struct{}),
		startCh:            make(chan struct{}, 1),
		resubmitIntervalCh: make(chan time.Duration),
		resubmitAdjustCh:   make(chan *intervalAdjust, resubmitAdjustChanSize),
	}
	// Subscribe NewTxsEvent for tx pool
	worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
	// Subscribe events for blockchain
	worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
	worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)

	// Sanitize recommit interval if the user-specified one is too short.
	if recommit < minRecommitInterval {
		log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
		recommit = minRecommitInterval
	}

	go worker.mainLoop()
	go worker.newWorkLoop(recommit)
	go worker.resultLoop()
	go worker.taskLoop()

	// Submit first work to initialize pending state.
	worker.startCh <- struct{}{}

	return worker
}

從圖中我們可以看到,新版本的worker中新新增或者更改了以下幾個通道:

最底層有4個 非同步的協程【請注意這四個協程】:

其中 newWorkLoop 是一個一直監聽 startCh 中是否有挖礦訊號 (startCh的訊號有 start函式放置進去的):

在 go worker.newWorkLoop中:

func (w *worker) newWorkLoop(recommit time.Duration) {
	var (
		interrupt   *int32
		minRecommit = recommit // minimal resubmit interval specified by user.
	)

	timer := time.NewTimer(0)
	<-timer.C // discard the initial tick

	// commit aborts in-flight transaction execution with given signal and resubmits a new one.
	commit := func(noempty bool, s int32) {
		if interrupt != nil {
			atomic.StoreInt32(interrupt, s)
		}
		interrupt = new(int32)
		w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty}
		timer.Reset(recommit)
		atomic.StoreInt32(&w.newTxs, 0)
	}
	// recalcRecommit recalculates the resubmitting interval upon feedback.
	recalcRecommit := func(target float64, inc bool) {
		var (
			prev = float64(recommit.Nanoseconds())
			next float64
		)
		if inc {
			next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
			// Recap if interval is larger than the maximum time interval
			if next > float64(maxRecommitInterval.Nanoseconds()) {
				next = float64(maxRecommitInterval.Nanoseconds())
			}
		} else {
			next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
			// Recap if interval is less than the user specified minimum
			if next < float64(minRecommit.Nanoseconds()) {
				next = float64(minRecommit.Nanoseconds())
			}
		}
		recommit = time.Duration(int64(next))
	}

	for {
		select {
		case <-w.startCh:
			commit(false, commitInterruptNewHead)

		case <-w.chainHeadCh:
			commit(false, commitInterruptNewHead)

		case <-timer.C:
			// If mining is running resubmit a new work cycle periodically to pull in
			// higher priced transactions. Disable this overhead for pending blocks.
			if w.isRunning() && (w.config.Clique == nil || w.config.Clique.Period > 0) {
				// Short circuit if no new transaction arrives.
				if atomic.LoadInt32(&w.newTxs) == 0 {
					timer.Reset(recommit)
					continue
				}
				commit(true, commitInterruptResubmit)
			}

		case interval := <-w.resubmitIntervalCh:
			// Adjust resubmit interval explicitly by user.
			if interval < minRecommitInterval {
				log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval)
				interval = minRecommitInterval
			}
			log.Info("Miner recommit interval update", "from", minRecommit, "to", interval)
			minRecommit, recommit = interval, interval

			if w.resubmitHook != nil {
				w.resubmitHook(minRecommit, recommit)
			}

		case adjust := <-w.resubmitAdjustCh:
			// Adjust resubmit interval by feedback.
			if adjust.inc {
				before := recommit
				recalcRecommit(float64(recommit.Nanoseconds())/adjust.ratio, true)
				log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
			} else {
				before := recommit
				recalcRecommit(float64(minRecommit.Nanoseconds()), false)
				log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
			}

			if w.resubmitHook != nil {
				w.resubmitHook(minRecommit, recommit)
			}

		case <-w.exitCh:
			return
		}
	}
}

我們可以看到:

如果有 就提交一個挖礦作業 方法 commit()

我們又可以看到 commit 裡面其實是 構造了一個 挖礦的請求 實體而已,並且再把 請求實體交由  w.newWorkCh 通道【請記住這個通道】;

這是在 newWorkLoop中定義的內部匿名函式 commit:

// commit aborts in-flight transaction execution with given signal and resubmits a new one.
	commit := func(noempty bool, s int32) {
		if interrupt != nil {
			atomic.StoreInt32(interrupt, s)
		}
		interrupt = new(int32)
		w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty}
		timer.Reset(recommit)
		atomic.StoreInt32(&w.newTxs, 0)
	}

緊接著我們再來看 newWorker 初始化函式中的四個協程之一的   worker.mainLoop()

// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
	defer w.txsSub.Unsubscribe()
	defer w.chainHeadSub.Unsubscribe()
	defer w.chainSideSub.Unsubscribe()

	for {
		select {
		case req := <-w.newWorkCh:
			w.commitNewWork(req.interrupt, req.noempty)

		case ev := <-w.chainSideCh:
			if _, exist := w.possibleUncles[ev.Block.Hash()]; exist {
				continue
			}
			// Add side block to possible uncle block set.
			w.possibleUncles[ev.Block.Hash()] = ev.Block
			// If our mining block contains less than 2 uncle blocks,
			// add the new uncle block if valid and regenerate a mining block.
			if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
				start := time.Now()
				if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
					var uncles []*types.Header
					w.current.uncles.Each(func(item interface{}) bool {
						hash, ok := item.(common.Hash)
						if !ok {
							return false
						}
						uncle, exist := w.possibleUncles[hash]
						if !exist {
							return false
						}
						uncles = append(uncles, uncle.Header())
						return false
					})
					w.commit(uncles, nil, true, start)
				}
			}

		case ev := <-w.txsCh:
			// Apply transactions to the pending state if we're not mining.
			//
			// Note all transactions received may not be continuous with transactions
			// already included in the current mining block. These transactions will
			// be automatically eliminated.
			if !w.isRunning() && w.current != nil {
				w.mu.RLock()
				coinbase := w.coinbase
				w.mu.RUnlock()

				txs := make(map[common.Address]types.Transactions)
				for _, tx := range ev.Txs {
					acc, _ := types.Sender(w.current.signer, tx)
					txs[acc] = append(txs[acc], tx)
				}
				txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs)
				w.commitTransactions(txset, coinbase, nil)
				w.updateSnapshot()
			} else {
				// If we're mining, but nothing is being processed, wake on new transactions
				if w.config.Clique != nil && w.config.Clique.Period == 0 {
					w.commitNewWork(nil, false)
				}
			}
			atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))

		// System stopped
		case <-w.exitCh:
			return
		case <-w.txsSub.Err():
			return
		case <-w.chainHeadSub.Err():
			return
		case <-w.chainSideSub.Err():
			return
		}
	}
}

我們就可以看到在裡面有對 newWorkLoop 中的那個commit函式中的 w.newWorkCh 通道,且在裡頭做實時的監聽,一發現有內容就會把它提交給一個叫做  w.commitNewWork(req.interrupt, req.noempty) 挖礦作業提交函式,我們再跟進去看,發現他無非就是做各種各樣的 block 預處理工作,到方法的末尾 交付給了  w.commit(uncles, w.fullTaskHook, true, tstart)

我們再跟進去 看看裡面是做了什麼

// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
	// Deep copy receipts here to avoid interaction between different tasks.
	receipts := make([]*types.Receipt, len(w.current.receipts))
	for i, l := range w.current.receipts {
		receipts[i] = new(types.Receipt)
		*receipts[i] = *l
	}
	s := w.current.state.Copy()
	block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
	if err != nil {
		return err
	}
	if w.isRunning() {
		if interval != nil {
			interval()
		}
		select {
		case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
			w.unconfirmed.Shift(block.NumberU64() - 1)

			feesWei := new(big.Int)
			for i, tx := range block.Transactions() {
				feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
			}
			feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))

			log.Info("Commit new mining work", "number", block.Number(), "uncles", len(uncles), "txs", w.current.tcount,
				"gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))

		case <-w.exitCh:
			log.Info("Worker has exited")
		}
	}
	if update {
		w.updateSnapshot()
	}
	return nil
}

我們發現 w.commit(uncles, w.fullTaskHook, true, tstart)  裡面是先對 收據資料先做一些處理然後把 構造好的 收據 receipts  狀態 state 及預先打包好的block (裡頭有些內容需要真正挖礦來回填的,比如 隨機數,出塊時間等等)  構造成一個任務實體 task 物件並提交給 w.taskCh 通道 【注意了 重頭戲來了】,以及呼叫 engine.Finalize 函式做當前狀態的封裝。

我們再回去看 newWorker 函式中四個協程的 worker.taskLoop()

在裡頭一直監聽這個 taskCh 通道過來的 task 實體,一有就啟動 seal() 函式把這些資訊 交由底層的共識層的實現去 做真正的挖礦

 我們跟進 go w.seal(task, stopCh) 裡頭看看做了什麼:

發現把之前預先構造好的 block 等都交由底層共識去做了,最終把挖出來的 block 通過  w.resultCh 通道返回,【這一步和以前用Agent 挖礦的邏輯的底層一致】

最後處理結果是那四個協程中的 go worker.resultLoop()  來處理的:

// resultLoop is a standalone goroutine to handle sealing result submitting
// and flush relative data to the database.
func (w *worker) resultLoop() {
	for {
		select {
		case result := <-w.resultCh:
			// Short circuit when receiving empty result.
			if result == nil {
				continue
			}
			// Short circuit when receiving duplicate result caused by resubmitting.
			block := result.block
			if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
				continue
			}
			// Update the block hash in all logs since it is now available and not when the
			// receipt/log of individual transactions were created.
			for _, r := range result.receipts {
				for _, l := range r.Logs {
					l.BlockHash = block.Hash()
				}
			}
			for _, log := range result.state.Logs() {
				log.BlockHash = block.Hash()
			}
			// Commit block and state to database.
			stat, err := w.chain.WriteBlockWithState(block, result.receipts, result.state)
			if err != nil {
				log.Error("Failed writing block to chain", "err", err)
				continue
			}
			// Broadcast the block and announce chain insertion event
			w.mux.Post(core.NewMinedBlockEvent{Block: block})
			var (
				events []interface{}
				logs   = result.state.Logs()
			)
			switch stat {
			case core.CanonStatTy:
				events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
				events = append(events, core.ChainHeadEvent{Block: block})
			case core.SideStatTy:
				events = append(events, core.ChainSideEvent{Block: block})
			}
			w.chain.PostChainEvents(events, logs)

			// Insert the block into the set of pending ones to resultLoop for confirmations
			w.unconfirmed.Insert(block.NumberU64(), block.Hash())

		case <-w.exitCh:
			return
		}
	}
}

so 本次改動去除了 Agent這個代理類,全部動作都交由 newWorker 函式中的四個協程去實現,且相互之間通過 對應的通道來通訊!完美