1. 程式人生 > >死磕以太坊原始碼分析之MPT樹-下

死磕以太坊原始碼分析之MPT樹-下

> 死磕以太坊原始碼分析之MPT樹-下 > > 文章以及資料請檢視:https://github.com/blockchainGuide/ > [上篇](https://github.com/blockchainGuide/)主要介紹了以太坊中的MPT樹的原理,這篇主要會對MPT樹涉及的原始碼進行拆解分析。`trie`模組主要有以下幾個檔案: ```GO |-encoding.go 主要講編碼之間的轉換 |-hasher.go 實現了從某個結點開始計運算元樹的雜湊的功能 |-node.go 定義了一個Trie樹中所有結點的型別和解析的程式碼 |-sync.go 實現了SyncTrie物件的定義和所有方法 |-iterator.go 定義了所有列舉相關介面和實現 |-secure_trie.go 實現了SecureTrie物件 |-proof.go 為key構造一個merkle證明 |-trie.go Trie樹的增刪改查 |-database.go 對記憶體中的trie樹節點進行引用計數 ``` ## 實現概覽 ### encoding.go 這個主要是講三種編碼(`KEYBYTES encoding`、`HEX encoding`、`COMPACT encoding`)的實現與轉換,`trie`中全程都需要用到這些,該檔案中主要實現瞭如下功能: 1. hex編碼轉換為Compact編碼:`hexToCompact()` 2. Compact編碼轉換為hex編碼:`compactToHex()` 3. keybytes編碼轉換為Hex編碼:`keybytesToHex()` 4. hex編碼轉換為keybytes編碼:`hexToKeybytes()` 5. 獲取兩個位元組陣列的公共字首的長度:`prefixLen()` ```go func hexToCompact(hex []byte) []byte { terminator := byte(0) if hasTerm(hex) { //檢查是否有結尾為0x10 => 16 terminator = 1 //有結束標記16說明是葉子節點 hex = hex[:len(hex)-1] //去除尾部標記 } buf := make([]byte, len(hex)/2+1) // 位元組陣列 buf[0] = terminator << 5 // 標誌byte為00000000或者00100000 //如果長度為奇數,新增奇數位標誌1,並把第一個nibble位元組放入buf[0]的低四位 if len(hex)&1 == 1 { buf[0] |= 1 << 4 // 奇數標誌 00110000 buf[0] |= hex[0] // 第一個nibble包含在第一個位元組中 0011xxxx hex = hex[1:] } //將兩個nibble位元組合併成一個位元組 decodeNibbles(hex, buf[1:]) return buf ``` ```go //compact編碼轉化為Hex編碼 func compactToHex(compact []byte) []byte { base := keybytesToHex(compact) base = base[:len(base)-1] // apply terminator flag // base[0]包括四種情況 // 00000000 擴充套件節點偶數位 // 00000001 擴充套件節點奇數位 // 00000010 葉子節點偶數位 // 00000011 葉子節點奇數位 // apply terminator flag if base[0] >= 2 { //如果是葉子節點,末尾新增Hex標誌位16 base = append(base, 16) } // apply odd flag //如果是偶數位,chop等於2,否則等於1 chop := 2 - base[0]&1 return base[chop:] } ``` ```go //compact編碼轉化為Hex編碼 func compactToHex(compact []byte) []byte { base := keybytesToHex(compact) base = base[:len(base)-1] // apply terminator flag // base[0]包括四種情況 // 00000000 擴充套件節點偶數位 // 00000001 擴充套件節點奇數位 // 00000010 葉子節點偶數位 // 00000011 葉子節點奇數位 // apply terminator flag if base[0] >= 2 { //如果是葉子節點,末尾新增Hex標誌位16 base = append(base, 16) } // apply odd flag //如果是偶數位,chop等於2,否則等於1 chop := 2 - base[0]&1 return base[chop:] } ``` ```go // 將十六進位制的bibbles轉成key bytes,這隻能用於偶數長度的key func hexToKeybytes(hex []byte) []byte { if hasTerm(hex) { hex = hex[:len(hex)-1] } if len(hex)&1 != 0 { panic("can't convert hex key of odd length") } key := make([]byte, (len(hex)+1)/2) decodeNibbles(hex, key) return key } ``` ```go // 返回a和b的公共字首的長度 func prefixLen(a, b []byte) int { var i, length = 0, len(a) if len(b) < length { length = len(b) } for ; i < length; i++ { if a[i] != b[i] { break } } return i } ``` ### node.go #### 四種節點 node 介面分四種實現: fullNode,shortNode,valueNode,hashNode,其中只有 fullNode 和 shortNode 可以帶有子節點。 ```go type ( fullNode struct { Children [17]node // 分支節點 flags nodeFlag } shortNode struct { //擴充套件節點 Key []byte Val node //可能指向葉子節點,也可能指向分支節點。 flags nodeFlag } hashNode []byte valueNode []byte // 葉子節點值,但是該葉子節點最終還是會包裝在shortNode中 ) ``` ### trie.go Trie物件實現了MPT樹的所有功能,包括(key, value)對的增刪改查、計算默克爾雜湊,以及將整個樹寫入資料庫中。 ### iterator.go `nodeIterator`提供了遍歷樹內部所有結點的功能。其結構如下:此結構體是在`trie.go`定義的 ```go type nodeIterator struct { trie.NodeIterator t *odrTrie err error } ``` 裡面包含了一個介面`NodeIterator`,它的實現則是由`iterator.go`來提供的,其方法如下: ```go func (it *nodeIterator) Next(descend bool) bool func (it *nodeIterator) Hash() common.Hash func (it *nodeIterator) Parent() common.Hash func (it *nodeIterator) Leaf() bool func (it *nodeIterator) LeafKey() []byte func (it *nodeIterator) LeafBlob() []byte func (it *nodeIterator) LeafProof() [][]byte func (it *nodeIterator) Path() []byte {} func (it *nodeIterator) seek(prefix []byte) error func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, error) func (it *nodeIterator) nextChild(parent *nodeIteratorState, ancestor common.Hash) (*nodeIteratorState, []byte, bool) func (it *nodeIterator) push(state *nodeIteratorState, parentIndex *int, path []byte) func (it *nodeIterator) pop() ``` `NodeIterator`的核心是`Next`方法,每呼叫一次這個方法,NodeIterator物件代表的當前節點就會更新至下一個節點,當所有結點遍歷結束,`Next`方法返回`false`。 生成NodeIterator結口的方法有以下3種: **①:Trie.NodeIterator(start []byte)** 通過`start`引數指定從哪個路徑開始遍歷,如果為`nil`則從頭到尾按順序遍歷。 **②:NewDifferenceIterator(a, b NodeIterator)** 當呼叫`NewDifferenceIterator(a, b NodeIterator)`後,生成的`NodeIterator`只遍歷存在於 b 但不存在於 a 中的結點。 **③:NewUnionIterator(iters []NodeIterator)** 當呼叫`NewUnionIterator(its []NodeIterator)`後,生成的`NodeIterator`遍歷的結點是所有傳入的結點的合集。 ### database.go `Database`是`trie`模組對真正資料庫的快取層,其目的是對快取的節點進行引用計數,從而實現區塊的修剪功能。主要方法如下: ```go func NewDatabase(diskdb ethdb.KeyValueStore) *Database func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database func (db *Database) DiskDB() ethdb.KeyValueReader func (db *Database) InsertBlob(hash common.Hash, blob []byte) func (db *Database) insert(hash common.Hash, blob []byte, node node) func (db *Database) insertPreimage(hash common.Hash, preimage []byte) func (db *Database) node(hash common.Hash) node func (db *Database) Node(hash common.Hash) ([]byte, error) func (db *Database) preimage(hash common.Hash) ([]byte, error) func (db *Database) secureKey(key []byte) []byte func (db *Database) Nodes() []common.Hash func (db *Database) Reference(child common.Hash, parent common.Hash) func (db *Database) Dereference(root common.Hash) func (db *Database) dereference(child common.Hash, parent common.Hash) func (db *Database) Cap(limit common.StorageSize) error func (db *Database) Commit(node common.Hash, report bool) error ``` ### security_trie.go 可以理解為加密了的`trie`的實現,`ecurity_trie`包裝了一下`trie`樹, 所有的`key`都轉換成`keccak256`演算法計算的`hash`值。同時在資料庫裡面儲存`hash`值對應的原始的`key`。 但是官方在程式碼裡也註釋了,這個程式碼不穩定,除了測試用例,別的地方並沒有使用該程式碼。 ### proof.go - Prove():根據給定的`key`,在`trie`中,將滿足`key`中最大長度字首的路徑上的節點都加入到`proofDb`(佇列中每個元素滿足:未編碼的hash以及對應`rlp`編碼後的節點) - VerifyProof():驗證`proffDb`中是否存在滿足輸入的`hash`,和對應key的節點,如果滿足,則返回`rlp`解碼後的該節點。 ## 實現細節 ### Trie物件的增刪改查 ①:**Trie樹的初始化** 如果`root`不為空,就通過`resolveHash`來載入整個`Trie`樹,如果為空,就新建一個`Trie`樹。 ```go func New(root common.Hash, db *Database) (*Trie, error) { if db == nil { panic("trie.New called without a database") } trie := &Trie{ db: db, } if root != (common.Hash{}) && root != emptyRoot { rootnode, err := trie.resolveHash(root[:], nil) if err != nil { return nil, err } trie.root = rootnode } return trie, nil } ``` ②:**Trie樹的插入** 首先Trie樹的插入是個遞迴呼叫的過程,它會從根開始找,一直找到合適的位置插入。 ```go func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) ``` 引數說明: - n: 當前要插入的節點 - prefix: 當前已經處理完的**key**(節點共有的字首) - key: 未處理完的部分**key**,完整的`key = prefix + key` - value:需要插入的值 返回值說明: - bool : 操作是否改變了**Trie**樹(**dirty**) - Node :插入完成後的子樹的根節點 接下來就是分別對`shortNode`、`fullNode`、`hashNode`、`nil` 幾種情況進行說明。 **2.1:節點為nil** 空樹直接返回`shortNode`, 此時整顆樹的根就含有了一個`shortNode`節點。 ```GO case nil: return true, &shortNode{key, value, t.newFlag()}, nil ``` **2.2 :節點為shortNode** - 首先計算公共字首,如果公共字首就等於`key`,那麼說明這兩個`key`是一樣的,如果`value`也一樣的(`dirty == false`),那麼返回錯誤。 - 如果沒有錯誤就更新`shortNode`的值然後返回 - 如果公共字首不完全匹配,那麼就需要把公共字首提取出來形成一個獨立的節點(擴充套件節點),擴充套件節點後面連線一個`branch`節點,`branch`節點後面看情況連線兩個`short`節點。 - 首先構建一個branch節點(branch := &fullNode{flags: t.newFlag()}),然後再branch節點的Children位置呼叫t.insert插入剩下的兩個short節點 ```go matchlen := prefixLen(key, n.Key) if matchlen == len(n.Key) { dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value) if !dirty || err != nil { return false, n, err } return true, &shortNode{n.Key, nn, t.newFlag()}, nil } branch := &fullNode{flags: t.newFlag()} var err error _, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val) if err != nil { return false, nil, err } _, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value) if err != nil { return false, nil, err } if matchlen == 0 { return true, branch, nil } return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil ``` **2.3: 節點為fullNode** 節點是`fullNode`(也就是分支節點),那麼直接往對應的孩子節點呼叫`insert`方法,然後把對應的孩子節點指向新生成的節點。 ```go dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value) if !dirty || err != nil { return false, n, err } n = n.copy() n.flags = t.newFlag() n.Children[key[0]] = nn return true, n, nil ``` **2.4: 節點為hashnode** 暫時還在資料庫中的節點,先呼叫 `t.resolveHash(n, prefix)`來載入到記憶體,然後呼叫`insert`方法來插入。 ```go rn, err := t.resolveHash(n, prefix) if err != nil { return false, nil, err } dirty, nn, err := t.insert(rn, prefix, key, value) if !dirty || err != nil { return false, rn, err } return true, nn, nil ``` ③:**Trie樹查詢值** 其實就是根據輸入的`hash`,找到對應的葉子節點的資料。主要看`TryGet`方法。 引數: - `origNode`:當前查詢的起始**node**位置 - `key`:輸入要查詢的資料的**hash** - `pos`:當前**hash**匹配到第幾位 ```go func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) { switch n := (origNode).(type) { case nil: //表示當前trie是空樹 return nil, nil, false, nil case valueNode: ////這就是我們要查詢的葉子節點對應的資料 return n, n, false, nil case *shortNode: ////在葉子節點或者擴充套件節點匹配 if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) { return nil, n, false, nil } value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key)) if err == nil && didResolve { n = n.copy() n.Val = newnode } return value, n, didResolve, err case *fullNode://在分支節點匹配 value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1) if err == nil && didResolve { n = n.copy() n.Children[key[pos]] = newnode } return value, n, didResolve, err case hashNode: //說明當前節點是輕節點,需要從db中獲取 child, err := t.resolveHash(n, key[:pos]) if err != nil { return nil, n, true, err } value, newnode, _, err := t.tryGet(child, key, pos) return value, newnode, true, err ... } ``` `didResolve`用於判斷`trie`樹是否會發生變化,`tryGet()`只是用來獲取資料的,當`hashNode`去`db`中獲取該`node`值後需要更新現有的trie,`didResolve`就會發生變化。其他就是基本的遞迴查詢樹操作。 ④:**Trie樹更新值** 更新值,其實就是呼叫insert方法進行操作。 到此Trie樹的增刪改查就講解的差不多了。 ### 將節點寫入到Trie的記憶體資料庫 如果要把節點寫入到記憶體資料庫,需要序列化,可以先去了解下以太坊的Rlp編碼。這部分工作由`trie.Commit()`完成,當`trie.Commit(nil)`,會執行序列化和快取等操作,序列化之後是使用的`Compact Encoding`進行編碼,從而達到節省空間的目的。 ```go func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) { if t.db == nil { panic("commit called on trie with nil database") } hash, cached, err := t.hashRoot(t.db, onleaf) if err != nil { return common.Hash{}, err } t.root = cached return common.BytesToHash(hash.(hashNode)), nil } ``` 上述程式碼大概講了這些: - 每次執行`Commit()`,該trie的`cachegen`就會加 1 - `Commit()`方法返回的是`trie.root`所指向的`node`的`hash`(未編碼) - 其中的`hashRoot()`方法目的是`返回trie.root所指向的node的hash`以及`每個節點都帶有各自hash的trie樹的root`。 ```go //為每個node生成一個hash func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) { if t.root == nil { return hashNode(emptyRoot.Bytes()), nil, nil } h := newHasher(onleaf) defer returnHasherToPool(h) return h.hash(t.root, db, true) //為每個節點生成一個未編碼的hash } ``` 而`hashRoot`的核心方法就是 `h.hash`,它返回了頭節點的`hash`以及每個子節點都帶有`hash`的頭節點(Trie.root指向它),大致做了以下幾件事: ①:*如果我們不儲存節點,而只是雜湊,則從快取中獲取資料* ```go if hash, dirty := n.cache(); hash != nil { if db == nil { return hash, n, nil } if !dirty { switch n.(type) { case *fullNode, *shortNode: return hash, hash, nil default: return hash, n, nil } } } ``` ②:*遞迴呼叫`h.hashChildren`,求出所有的子節點的`hash`值,再把原有的子節點替換成現在子節點的`hash`值* **2.1:如果節點是`shortNode`** 首先把`collapsed.Key從Hex Encoding` 替換成 `Compact Encoding`, 然後遞迴呼叫`hash`方法計運算元節點的`hash`和`cache`,從而把子節點替換成了子節點的`hash`值 ```go collapsed, cached := n.copy(), n.copy() collapsed.Key = hexToCompact(n.Key) cached.Key = common.CopyBytes(n.Key) if _, ok := n.Val.(valueNode); !ok { collapsed.Val, cached.Val, err = h.hash(n.Val, db, false) if err != nil { return original, original, err } } return collapsed, cached, nil ``` **2.2:節點是fullNode** 遍歷每個子節點,把子節點替換成子節點的`Hash`值,否則的化這個節點沒有`children`。直接返回。 ```go collapsed, cached := n.copy(), n.copy() for i := 0; i < 16; i++ { if n.Children[i] != nil { collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false) if err != nil { return original, original, err } } } cached.Children[16] = n.Children[16] return collapsed, cached, nil ``` ③:*儲存節點n的雜湊值,如果我們指定了儲存層,它會寫對應的鍵/值對* store()方法主要就做了兩件事: - `rlp`序列化`collapsed`節點並將其插入db磁碟中 - 生成當前節點的`hash` - 將節點雜湊插入`db` **3.1:空資料或者hashNode,則不處理** ```go if _, isHash := n.(hashNode); n == nil || isHash { return n, nil } ``` **3.2:生成節點的RLP編碼** ```go h.tmp.Reset() // 快取初始化 if err := rlp.Encode(&h.tmp, n); err != nil { //將當前node序列化 panic("encode error: " + err.Error()) } if len(h.tmp) < 32 && !force { return n, nil // Nodes smaller than 32 bytes are stored inside their parent 編碼後的node長度小於32,若force為true,則可確保所有節點都被編碼 } //長度過大的,則都將被新計算出來的hash取代 hash, _ := n.cache() //取出當前節點的hash if hash == nil { hash = h.makeHashNode(h.tmp) //生成雜湊node } ``` **3.3:將Trie節點合併到中間記憶體快取中** ```go hash := common.BytesToHash(hash) db.lock.Lock() db.insert(hash, h.tmp, n) db.lock.Unlock() // Track external references from account->storage trie //跟蹤帳戶->儲存Trie中的外部引用 if h.onleaf != nil { switch n := n.(type) { case *shortNode: if child, ok := n.Val.(valueNode); ok { //指向的是分支節點 h.onleaf(child, hash) //用於統計當前節點的資訊,比如當前節點有幾個子節點,當前有效的節點數 } case *fullNode: for i := 0; i < 16; i++ { if child, ok := n.Children[i].(valueNode); ok { h.onleaf(child, hash) } } } } ``` 到此為止將節點寫入到`Trie`的記憶體資料庫就已經完成了。 *如果覺得文章不錯可以關注公眾號:**區塊鏈技術棧**,詳細的所有以太坊原始碼分析文章內容以及程式碼資料都在其中。* ### Trie樹快取機制 `Trie`樹的結構裡面有兩個引數, 一個是`cachegen`,一個是`cachelimit`。這兩個引數就是`cache`控制的引數。 `Trie`樹每一次呼叫`Commit`方法,會導致當前的`cachegen`增加1。 ```go func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) { ... t.cachegen++ ... } ``` 然後在`Trie`樹插入的時候,會把當前的`cachegen`存放到節點中。 ```go func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) { .... return true, &shortNode{n.Key, nn, t.newFlag()}, nil } func (t *Trie) newFlag() nodeFlag { return nodeFlag{dirty: true, gen: t.cachegen} } ``` 如果 `trie.cachegen - node.cachegen > cachelimit`,就可以把節點從記憶體裡面拿掉。 也就是說節點經過幾次`Commit`,都沒有修改,那麼就把節點從記憶體裡面幹掉。 只要`trie`路徑上新增或者刪除一個節點,整個路徑的節點都需要重新例項化,也就是節點中的`nodeFlag`被初始化了。都需要重新更新到`db`磁碟。 拿掉節點過程在 `hasher.hash`方法中, 這個方法是在`commit`的時候呼叫。如果方法的`canUnload`方法呼叫返回真,那麼就拿掉節點,如果只返回了`hash`節點,而沒有返回`node`節點,這樣節點就沒有引用,不久就會被gc清除掉。 節點被拿掉之後,會用一個`hashNode`節點來表示這個節點以及其子節點。 如果後續需要使用,可以通過方法把這個節點載入到記憶體裡面來。 ```go func (h *hasher) hash(n node, db *Database, force bool) (node, node, error) { .... // 從快取中解除安裝節點。它的所有子節點將具有較低或相等的快取世代號碼。 cacheUnloadCounter.Inc(1) ... } ``` ## 參考&總結 這部分重要的內容也就上面講述的,主要集中在`Trie`上面,如果有不對的地方,可以及時指正哦。 > https://mindcarver.cn/about/ > > https://github.com/blockchainGuide/blockcha