1. 程式人生 > >死磕以太坊原始碼分析之txpool

死磕以太坊原始碼分析之txpool

> 死磕以太坊原始碼分析之txpool > > 請結合以下程式碼閱讀:https://github.com/blockchainGuide/ > > 寫文章不易,也希望大家多多指出問題,交個朋友,混個圈子哦 ## 交易池概念原理 交易池工作概況: ![image-20201225104748102](https://tva1.sinaimg.cn/large/0081Kckwgy1glzwre4v4ej31120tcgpa.jpg) 1. 交易池的資料來源主要來自: - 本地提交,也就是第三方應用通過呼叫本地以太坊節點的`RPC`服務所提交的交易; - 遠端同步,是指通過廣播同步的形式,將其他以太坊節點的交易資料同步至本地節點; 2. 交易池中交易去向:被Miner模組獲取並驗證,用於挖礦;挖礦成功後寫進區塊並被廣播 3. `Miner`取走交易是複製,交易池中的交易並不減少。直到交易被寫進規範鏈後才從交易池刪除; 4. 交易如果被寫進分叉,交易池中的交易也不減少,等待重新打包。 ## 關鍵資料結構 ### TxPoolConfig ```go type TxPoolConfig struct { Locals []common.Address // 本地賬戶地址存放 NoLocals bool // 是否開啟本地交易機制 Journal string // 本地交易存放路徑 Rejournal time.Duration // 持久化本地交易的間隔 PriceLimit uint64 // 價格超出比例,若想覆蓋一筆交易的時候,若價格上漲比例達不到要求,那麼不能覆蓋 PriceBump uint64 // 替換現有交易的最低價格漲幅百分比(一次) AccountSlots uint64 // 每個賬戶的可執行交易限制 GlobalSlots uint64 // 全部賬戶最大可執行交易 AccountQueue uint64 // 單個賬戶不可執行的交易限制 GlobalQueue uint64 // 全部賬戶最大非執行交易限制 Lifetime time.Duration // 一個賬戶在queue中的交易可以存活的時間 } ``` 預設配置: > ```go > Journal: "transactions.rlp", > Rejournal: time.Hour, > > PriceLimit: 1, > PriceBump: 10, > > AccountSlots: 16, > GlobalSlots: 4096, > AccountQueue: 64, > GlobalQueue: 1024, > > Lifetime: 3 * time.Hour > ``` ### TxPool ```go type TxPool struct { config TxPoolConfig // 交易池配置 chainconfig *params.ChainConfig // 區塊鏈配置 chain blockChain // 定義blockchain介面 gasPrice *big.Int txFeed event.Feed //時間流 scope event.SubscriptionScope // 訂閱範圍 signer types.Signer //簽名 mu sync.RWMutex istanbul bool // Fork indicator whether we are in the istanbul stage. currentState *state.StateDB // 當前頭區塊對應的狀態 pendingNonces *txNoncer // Pending state tracking virtual nonces currentMaxGas uint64 // Current gas limit for transaction caps locals *accountSet // Set of local transaction to exempt from eviction rules journal *txJournal // Journal of local transaction to back up to disk pending map[common.Address]*txList // All currently processable transactions queue map[common.Address]*txList // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account all *txLookup // All transactions to allow lookups priced *txPricedList // All transactions sorted by price chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription reqResetCh chan *txpoolResetRequest reqPromoteCh chan *accountSet queueTxEventCh chan *types.Transaction reorgDoneCh chan chan struct{} reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop wg sync.WaitGroup // tracks loop, scheduleReorgLoop } ``` ## txpool初始化 `Txpool`初始化主要做了以下幾件事: ①:檢查配置 配置有問題則用預設值填充 ```go config = (&config).sanitize() ``` 對於這部分的檢查檢視`TxPoolConfig`的欄位。 ②:初始化本地賬戶 ```go pool.locals = newAccountSet(pool.signer) ``` ③:將配置的本地賬戶地址加到交易池 ```go pool.locals.add(addr) ``` 我們在安裝以太坊客戶端可以指定一個數據儲存目錄,此目錄便會儲存著所有我們匯入的或者通過本地客戶端建立的帳戶`keystore`檔案。而這個載入過程便是從該目錄載入帳戶資料 ④:更新交易池 ```go pool.reset(nil, chain.CurrentBlock().Header()) ``` ⑤:建立所有交易儲存的列表,所有交易的價格用最小堆存放 ```go pool.priced = newTxPricedList(pool.all) ``` 通過排序,優先處理`gasprice`越高的交易。 ⑥:如果本地交易開啟 那麼從本地磁碟載入本地交易 ```go if !config.NoLocals && config.Journal != "" { pool.journal = newTxJournal(config.Journal) if err := pool.journal.load(pool.AddLocals); err != nil { log.Warn("Failed to load transaction journal", "err", err) } if err := pool.journal.rotate(pool.local()); err != nil { log.Warn("Failed to rotate transaction journal", "err", err) } } ``` ⑦:訂閱鏈上事件訊息 ```go pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) ``` ⑧:開啟主迴圈 ```go go pool.loop() ``` > 注意:local交易比remote交易具有更高的許可權,一是不輕易被替換;二是持久化,即通過一個本地的journal檔案儲存尚未打包的local交易。所以在節點啟動的時候,優先從本地載入local交易。 > > 本地地址會被加入白名單,凡由此地址傳送的交易均被認為是local交易,不論是從本地遞交還是從遠端傳送來的。 到此為止交易池載入過程結束。 ## 新增交易到txpool 之前我們說過交易池中交易的來源一方面是其他節點廣播過來的,一方面是本地提交的,追根到原始碼一個是`AddLocal`,一個是`AddRemote`,不管哪個都會呼叫`addTxs`。我們對新增交易的討論就會從這個函式開始,它主要做了以下幾件事,先用一張簡圖說明一下: ![image-20201225104721173](https://tva1.sinaimg.cn/large/0081Kckwgy1glzxhi23euj31ak0u0h34.jpg) 1. 過濾池中已經存在的交易 ```go if pool.all.Get(tx.Hash()) != nil { errs[i] = fmt.Errorf("known transaction: %x", tx.Hash()) knownTxMeter.Mark(1) continue } ``` 2. 將交易新增到佇列中 ```go newErrs, dirtyAddrs := pool.addTxsLocked(news, local) ``` ```go 進入到addTxsLocked函式中: replaced, err := pool.add(tx, local) ``` 進入到 `pool.add`函式中,這個`add`函式相當重要,它是將交易新增到`queue`中,等待後面的promote,到`pending`中去。如果在`queue`或者`pending`中已經存在,並且它的gas price更高時,將覆蓋之前的交易。下面來拆開的分析一下add 這個函式。 ①:看交易是否收到過,如果已經收到過就丟棄 ```GO if pool.all.Get(hash) != nil { log.Trace("Discarding already known transaction", "hash", hash) knownTxMeter.Mark(1) return false, fmt.Errorf("known transaction: %x", hash) } ``` ②:如果交易沒通過驗證也要丟棄,這裡的重點是驗證函式: ```go validateTx: 主要做了以下幾件事 - 交易大小不能超過32kb - 交易金額不能為負 - 交易gas值不能超出當前交易池設定的gaslimit - 交易簽名必須正確 - 如果交易為遠端交易,則需驗證其gasprice是否小於交易池gasprice最小值,如果是本地,優先打包,不管gasprice - 判斷當前交易nonce值是否過低 - 交易所需花費的轉帳手續費是否大於帳戶餘額 cost == V + GP * GL - 判斷交易花費gas是否小於其預估花費gas ``` ③:如果交易池已滿,丟棄價格過低的交易 ```go if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { if !local && pool.priced.Underpriced(tx, pool.locals) { ... } drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) for _, tx := range drop { ... pool.removeTx(tx.Hash(), false) } } ``` 注意這邊的`GlobalSlots`和`GlobalQueue` ,就是我們說的`pending`和`queue`的最大容量,如果交易池的交易數超過兩者之和,就要丟棄價格過低的交易。 ④:判斷當前交易在pending佇列中是否存在`nonce`值相同的交易。存在則判斷當前交易所設定的`gasprice`是否超過設定的`PriceBump`百分比,超過則替換覆蓋已存在的交易,否則報錯返回`替換交易gasprice過低`,並且把它扔到`queue`佇列中`(enqueueTx)`。 ```go if list := pool.pending[from]; list != nil && list.Overlaps(tx) { // Nonce already pending, check if required price bump is met inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { pendingDiscardMeter.Mark(1) return false, ErrReplaceUnderpriced } // New transaction is better, replace old one if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed(1) pendingReplaceMeter.Mark(1) } pool.all.Add(tx) pool.priced.Put(tx) pool.journalTx(from, tx) pool.queueTxEvent(tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) return old != nil, nil } // New transaction isn't replacing a pending one, push into queue replaced, err = pool.enqueueTx(hash, tx) ``` 新增交易的流程就到此為止了。接下來就是如何把`queue`(暫時不可執行)中新增的交易扔到`pending`(可執行交易)中,速成`promote`。 3. 提升交易 提升交易主要把交易從`queue`扔到`pending`中,我們在接下來的裡面重點講 ```go done := pool.requestPromoteExecutables(dirtyAddrs) ``` ## 交易升級 `promoteExecutables`將`future queue`中的交易移動到`pending`中,同時也會刪除很多無效交易比如`nonce`低或者餘額低等等,主要分以下步驟:先看張圖: ![image-20201225104612253](https://tva1.sinaimg.cn/large/0081Kckwgy1glzxix54vaj313m0si4d2.jpg) ①:將所有`queue`中`nonce`低於賬戶當前`nonce`的交易從`all`裡面刪除 ```go forwards := list.Forward(pool.currentState.GetNonce(addr)) for _, tx := range forwards { hash := tx.Hash() pool.all.Remove(hash) log.Trace("Removed old queued transaction", "hash", hash) } ``` ②:將所有`queue`中花費大於賬戶餘額 或者`gas`大於限制的交易從all裡面刪除 ```go drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() pool.all.Remove(hash) log.Trace("Removed unpayable queued transaction", "hash", hash) } ``` ③:將所有可執行的交易從`queue`裡面移到`pending`裡面(`proteTx`) 注:可執行交易:將`pending`裡面`nonce`值大於等於賬戶當前狀態`nonce`的,且`nonce`連續的幾筆交易作為準備好的交易 ```go readies := list.Ready(pool.pendingNonces.get(addr)) for _, tx := range readies { hash := tx.Hash() if pool.promoteTx(addr, hash, tx) { log.Trace("Promoting queued transaction", "hash", hash) promoted = append(promoted, tx) } } ``` 重點就是 **promoteTx**的處理,這個方法與add的不同之處在於,`addTx`是獲得到的**新交易插入pending**,而`promoteTx`是將**queue列表中的Txs放入pending**接下來我們先看看裡面是如何來處理的: ```go inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { // An older transaction was better, discard this // 老的交易更好,刪除這個交易 pool.all.Remove(hash) pool.priced.Removed(1) pendingDiscardMeter.Mark(1) return false } // Otherwise discard any previous transaction and mark this // 現在這個交易更好,刪除舊的交易 if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed(1) pendingReplaceMeter.Mark(1) } else { ... } ``` 主要就做了這幾件事: 1. 將交易插入`pending`中,如果待插入的交易`nonce`在`pending`列表中存在,那麼待插入的交易`gas price`大於或等於原交易價值的`110%(`跟`pricebump`設定有關)時,替換原交易 2. 如果新交易替換了某個交易,從`all`列表中刪除老交易 3. 最後更新一下`all`列表 經過`proteTx`之後,要扔到`pending`的交易都放在了`promoted []*types.Transaction`中,再回到`promoteExecutables`中,繼續下面步驟: ④:如果非本地賬戶`queue`大於限制(`AccountQueue`),從最後取出`nonce`較大的交易進行`remove` ```GO if !pool.locals.contains(addr) { caps = list.Cap(int(pool.config.AccountQueue)) for _, tx := range caps { hash := tx.Hash() pool.all.Remove(hash) ... } ``` ⑤:最後如果佇列中此賬戶的交易為空則刪除此賬戶 ```go if list.Empty() { delete(pool.queue, addr) } ``` 到此我們的升級交易要做的事情就完畢了。 ------ ## 交易降級 交易降級的幾個場景: 1. 出現了新的區塊,將會從`pending`中移除出現在區塊中的交易到`queue`中 2. 或者是另外一筆交易(`gas price` 更高),則會從`pending`中移除到`queue`中 關鍵函式:demoteUnexecutables,主要做的事情如下: ①:遍歷`pending`中所有地址對應的交易列表 ```go for addr, list := range pool.pending { ...} ``` ②:刪除所有認為過舊的交易(`low nonce`) ```go olds := list.Forward(nonce) for _, tx := range olds { hash := tx.Hash() pool.all.Remove(hash) log.Trace("Removed old pending transaction", "hash", hash) } ``` ③:刪除所有費用過高的交易(餘額低或用盡),並將所有無效者送到`queue`中以備後用 ```go drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace("Removed unpayable pending transaction", "hash", hash) pool.all.Remove(hash) } pool.priced.Removed(len(olds) + len(drops)) pendingNofundsMeter.Mark(int64(len(drops))) for _, tx := range invalids { hash := tx.Hash() log.Trace("Demoting pending transaction", "hash", hash) pool.enqueueTx(hash, tx) } ``` ④:如果交易前面有間隙,將後面的交易移到`queue`中 ```go if list.Len() > 0 && list.txs.Get(nonce) == nil { gapped := list.Cap(0) for _, tx := range gapped { hash := tx.Hash() log.Error("Demoting invalidated transaction", "hash", hash) pool.enqueueTx(hash, tx) } pendingGauge.Dec(int64(len(gapped))) } ``` 注:間隙的出現通常是因為交易餘額問題導致的。假如原規範鏈 A 上交易m花費10,分叉後該賬戶又在分叉鏈B發出一個交易m花費20,這就導致該賬戶餘額本來可以支付A鏈上的某筆交易,但在B鏈上可能就不夠了。這個餘額不足的交易在B如果是n+3,那麼在A鏈上n+2,n+4號交易之間就出現了空隙,這就導致從n+3開始往後所有的交易都要降級; 到此為止交易降級結束。 ----- ## 重置交易池 ------- **重置交易池**將檢索區塊鏈的當前狀態(主要由於更新導致鏈狀態變化),並確保交易池的內容對於鏈狀態而言是有效的。 `reset`的呼叫時機如下: 1. `TxPool`初始化的過程:`NewTxPool`; 2. `TxPool`事件監聽`go`程收到規範鏈更新事件 流程圖如下: ![image-20201015185551752](https://tva1.sinaimg.cn/large/007S8ZIlgy1gjq7vc6bz8j31260sodlq.jpg) 根據上面流程圖,主要功能是由於規範鏈的更新,重新整理交易池: ①:*如果老區塊頭不為空 且老區塊頭不是新區塊的父區塊,說明新老區塊不在一條鏈上* ```go if oldHead != nil && oldHead.Hash() != newHead.ParentHash {} ``` ②:*如果新頭區塊和舊頭區塊相差大於64,則所有交易不必回退到交易池* ```go if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { log.Debug("Skipping deep transaction reorg", "depth", depth) } ``` ③:*如果舊鏈的頭區塊大於新鏈的頭區塊高度,舊鏈向後退並回收所有回退的交易* ```go for rem.NumberU64() > add.NumberU64() { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) return } } ``` ④:*如果新鏈的頭區塊大於舊鏈的頭區塊,新鏈後退並回收交易* ```go for add.NumberU64() > rem.NumberU64() { included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) return } } ``` ⑤:*當新舊鏈到達同一高度的時候同時回退,知道找到共同的父節點* ```go for rem.Hash() != add.Hash() { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) return } included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) return } } ``` ⑥:*給交易池設定最新的世界狀態* ```go statedb, err := pool.chain.StateAt(newHead.Root) if err != nil { log.Error("Failed to reset txpool state", "err", err) return } pool.currentState = statedb pool.pendingNonces = newTxNoncer(statedb) pool.currentMaxGas = newHead.GasLimit ``` ⑦:*把舊鏈回退的交易放入交易池* ```go senderCacher.recover(pool.signer, reinject) pool.addTxsLocked(reinject, false) ``` 到此整個`reset`的流程就結束了。 ----- > 參考: > > https://mindcarver.cn/ > > https://github.com/mindcarver/blockchain_guide > > https://learnblockchain.cn/2019/06/03/eth-txpool/#%E6%B8%85%E7%90%86%E4%BA%A4%E6%98%93%E6%B1%A0 > > https://blog.csdn.net/lj900911/article/details/8