1. 程式人生 > >以太坊 --- 交易池的特點 與 中斷恢復

以太坊 --- 交易池的特點 與 中斷恢復

> 作者:林冠巨集 / 指尖下的幽靈。轉載者,請: 務必標明出處。 > 部落格:http://www.cnblogs.com/linguanh/ > 掘金:https://juejin.im/user/1785262612681997 > GitHub : https://github.com/af913337456/ > 出版的書籍: * 《1.0-區塊鏈以太坊DApp開發實戰》 * 《2.0-區塊鏈DApp開發:基於以太坊和比特幣公鏈》 --- # 目錄 * 前序 * 以太坊交易池`知識點`總結 * 原始碼探祕 * 本地交易 * 本地錢包地址的初始化 * 載入本地交易 * pool.journal.load * pool.AddLocals * 本地交易檔案的更新 * 遠端交易 * P2P 通訊模組的初始化 * 接收 P2P 訊息 * 新增遠端交易到交易池 * “ 彩蛋 ” --- 21年的第一篇文章,開源寫作6年。 最近`比特幣`和`以太坊`的價格也已然起飛,現在一個 BTC 已能全款輛`某斯拉 model 3`汽車。離譜。 釋出這篇文章:從區塊鏈技術研發者的角度,說說我的區塊鏈從業經歷和對它的理解 的時候,是去年,現在回首去看最後那段話,一語成讖。 --- 言歸正傳。 一般做`資料池`之類的開發。比如:訂單池,請求池...,`傳統的服務端思想`會引導我們直接向`訊息中介軟體`想去。使用各類`訊息元件`去實現,比如 RocketMQ,Redis,Kafka... 然而,在區塊鏈公鏈應用中,現已知的多條公鏈,每一條,都有`交易池`這麼一個功能模組,且,它們的程式碼實現都沒有引入訊息中介軟體去實現。 早前在閱讀以太坊公鏈原始碼的時候,我就對以太坊交易池這一塊的實現思想感到新穎,今天總結下,分享給大家看看,區塊鏈公鏈應用中不依賴訊息中介軟體去實現交易池的做法及其特點。 --- 以太坊交易池`知識點`總結 _(BTW:面試的時候可死記): 1. 交易的分類: * 從本地檔案存與不存的角度去看: 1. 本地交易,若交易的傳送者地址是`配置變數`指定的地址,則認為是本地交易: * `節點啟動的時候,可以在配置檔案指定,不開啟本地交易的操作`。 2. 遠端交易,不滿足 1 條件的交易。 * 從記憶體儲存的角度去看: 1. Queue,待進入 Pending 的交易,結構是 `map[addr]TxList`; 2. Pending,待進入打包佇列的交易,結構和 Queue 一樣,由 1 轉化而來。 2. 交易的輸入(產生): * 程式啟動之初: 1. 本地交易,從本地檔案載入到記憶體,本地若沒,自然是 0 輸入; 2. 遠端交易,由 P2P 通訊模組,接收到交易資料,儲存到記憶體。 * 程式執行中: 1. 自己接收交易的 `RPC請求`,SendTransaction 或 SendRawTransaction; 2. 通過 P2P 通訊模組,接收其它節點的資訊,包含的動作有: 1. 舊交易的移除; 2. 新交易的增加。 3. 交易的持久化策略: * 本地交易: 1. `定時`從 Pending 和 Queue 中選出本地交易`儲存到本地檔案`; 2. 儲存方式,檔案替換,`先 new` 一個,`再 rename` 一波; 3. 注意第 2 點,檔案的替換,意味著`即是更新`,`也是刪除`操作; 4. 編碼方式,`rlp 編碼`,不是 json。 * 遠端交易: 1. 不存,不進行持久化,總是依賴由其它節點 P2P 通訊同步過來。 4. 中斷恢復: 1. 本地交易,同上面 `程式啟動之初` 的操作; 2. 遠端交易,沒有恢復,記憶體中的交易丟了就是丟了,不影響。即使當初正在打包,即使當前節點掛了,其它節點還在工作。 上面第 4 點,`中斷恢復`,對比於傳統後端服務的訊息中介軟體,對訊息的不丟失保障性,區塊鏈公鏈的做法,完全是靠分散式來維持的,單節點的資料丟失,可以從其它節點同步過來。所以,它們交易池的實現的實現,相對來說,更加靈活,編碼難點在訊息同步部分。 --- #### 下面進入枯燥的原始碼分析階段,讀有餘力的讀者可以繼續 要看註釋。 ## 本地交易 ### 1. 本地錢包地址的初始化 `原始碼檔案:tx_pool.go`,config.Locals 由配置檔案指定,是以太坊錢包地址陣列。 ```go func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { ... for _, addr := range config.Locals { // 從配置檔案新增 本地地址 log.Info("Setting new local account", "address", addr) // 新增到 locals 變數裡面,後面會用它來過濾出一個地址是否是本地地址 pool.locals.add(addr) } ... } ``` ### 2. 從本地檔案,載入交易資料資料,即載入本地交易 ```go func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { ... pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { log.Info("Setting new local account", "address", addr) pool.locals.add(addr) } ... // 上面新增完了 // If local transactions and journaling is enabled, load from disk if !config.NoLocals && config.Journal != "" { // 如果配置開啟了本地載入的需求 pool.journal = newTxJournal(config.Journal) // load 是載入函式,pool.AddLocals 是實際新增函式 if err := pool.journal.load(pool.AddLocals); err != nil { log.Warn("Failed to load transaction journal", "err", err) } if err := pool.journal.rotate(pool.local()); err != nil { log.Warn("Failed to rotate transaction journal", "err", err) } } ... go pool.loop() // 迴圈處理事件 } ``` ### 3. pool.journal.load `原始碼檔案:tx_journal.go` ```go func (journal *txJournal) load(add func([]*types.Transaction) []error) error { // Skip the parsing if the journal file doesn't exist at all if _, err := os.Stat(journal.path); os.IsNotExist(err) { return nil } // Open the journal for loading any past transactions input, err := os.Open(journal.path) // 開啟檔案,讀取流資料 if err != nil { return err } ... stream := rlp.NewStream(input, 0) // 使用 rlp 編碼演算法解碼資料 ... loadBatch := func(txs types.Transactions) { for _, err := range add(txs) { // 呼叫 add 函式,進行新增 if err != nil { log.Debug("Failed to add journaled transaction", "err", err) dropped++ } } } // loadBatch 在下面會被呼叫 ... } ``` ### 4. pool.AddLocals `pool.AddLocals` 是實際的新增函式。內部的一系列呼叫後,最終到 tx_pool.add 函式。pool 的 queue 都是 map 結構,能根據相同 key 去重。 ```go func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) { ... // 下面的 if,如果已在 pool.pending 裡面,那麼證明之前已經新增過在 queue 裡 if list := pool.pending[from]; list != nil && list.Overlaps(tx) { ... pool.journalTx(from, tx) // 內部呼叫 journal.insert return old != nil, nil } replaced, err = pool.enqueueTx(hash, tx) // 這裡,會新增到 pool.enqueue 裡面 if err != nil { return false, err } pool.journalTx(from, tx) // 內部呼叫 journal.insert ... } func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) { // 本地錢包地址,沒指定的話,就跳過 if pool.journal == nil || !pool.locals.contains(from) { return } // insert 會在造成重複新增,但是 load 出來的時候會根據 addr 去重 if err := pool.journal.insert(tx); err != nil { log.Warn("Failed to journal local transaction", "err", err) } } ``` 截止到上面,本地交易已經被新增到 pool 的 queue 裡面了。 節點啟動之初,除了會從本地 load 交易到 queue 外,還會不停地監聽鏈的事件,比如接收交易,再 add 交易 到 queue 裡。 ### 5. 本地交易檔案的更新 ( 插入 / 刪除 ) loop 是觸發的入口。除了主動的 journal.insert 達到了插入本地交易的目的之外。 下面的更新操作,也達到了包含插入的目的:`以替換的手段,從檔案刪除舊交易,儲存新交易到檔案` ```go func (pool *TxPool) loop() { ... for { select { ... // Handle local transaction journal rotation // journal 定時器,定時執行下面的本地交易資料檔案的更新 journal.rotate case <-journal.C: if pool.journal != nil { pool.mu.Lock() if err := pool.journal.rotate(pool.local()); err != nil { log.Warn("Failed to rotate local tx journal", "err", err) } pool.mu.Unlock() } } } } ``` journal.rotate 的做法,使用檔案替換的方式,來從 pool 的交易 pending 和 queue 中儲存 locals 錢包地址相關的交易到檔案。注意,只存本地錢包地址的,其它的,不存。 ```go //輸入 func (pool *TxPool) local() map[common.Address]types.Transactions { ... for addr := range pool.locals.accounts { if pending := pool.pending[addr]; pending != nil { // 新增 pending 的 txs[addr] = append(txs[addr], pending.Flatten()...) } if queued := pool.queue[addr]; queued != nil { // 新增 queue 的 txs[addr] = append(txs[addr], queued.Flatten()...) } } return txs } // all 引數,來源於上面 local() func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error { ... // journal.path+".new" 字尾 .new replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755) if err != nil { return err } journaled := 0 for _, txs := range all { for _, tx := range txs { if err = rlp.Encode(replacement, tx); err != nil { replacement.Close() return err } } journaled += len(txs) } replacement.Close() // rename,重新命名檔案到原始的 path,達到更新,替換目的 if err = os.Rename(journal.path+".new", journal.path); err != nil { return err } sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755) if err != nil { return err } ... return nil } ``` ## 遠端交易 ### P2P 通訊模組的初始化 `原始碼檔案:eth/backend.go` ```go func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { ... if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } // 初始化交易池 eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) ... // 使用 交易池指標物件 作為引數初始化 protocolManager if eth.protocolManager, err = NewProtocolManager( chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, `eth.txPool`, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil { return nil, err } ... return eth, nil } func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) { // 下面初始化 tx_fetcher,使用 txpool.AddRemotes 賦值給函式變數 addTxs manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx) } ``` ### 接收 P2P 訊息 `原始碼檔案:eth/handler.go` ```go func (pm *ProtocolManager) handleMsg(p *peer) error { ... switch { ... // 接收到其它節點的交易資料 case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65): ... // Enqueue 將交易新增到交易池 pm.txFetcher.Enqueue(p.id, txs, msg.Code == PooledTransactionsMsg) } ... } // tx_fetcher.go 檔案 func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error { ... errs := f.addTxs(txs) // 執行新增,這個函式其實就是 tx_pool.go 的 AddRemotes ... } ``` ### 新增遠端交易到交易池 ```go // tx_pool.go // addTxs 內部就會把交易新增到 Pending 和 Queue 裡面 func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { return pool.addTxs(txs, false, false) } ``` # 打完收工 更多以太坊的開發知識,見我的書籍: 《2.0-區塊鏈DApp開發:基於以太坊和比特幣