兄弟連區塊鏈教程以太坊原始碼分析chain-indexer區塊鏈索引一
chain_indexer 區塊鏈索引
chain_indexer.go 原始碼解析
chain_indexer 顧名思義, 就是用來給區塊鏈建立索引的功能。 之前在eth協議的時候,介紹過BloomIndexer的功能,其實BloomIndexer是chain_indexer的一個特殊的實現, 可以理解為派生類, 主要的功能其實實在chain_indexer這裡面實現的。雖說是派生類,但是chain_indexer其實就只被BloomIndexer使用。也就是給區塊鏈的布隆過濾器建立了索引,以便快速的響應使用者的日誌搜尋功能。 下面就來分析這部分的程式碼。
資料結構
// ChainIndexerBackend defines the methods needed to process chain segments in
// the background and write the segment results into the database. These can be
// used to create filter blooms or CHTs.
// ChainIndexerBackend定義了處理區塊鏈片段的方法,並把處理結果寫入資料庫。 這些可以用來建立布隆過濾器或者CHTs.
// BloomIndexer 其實就是實現了這個介面 ChainIndexerBackend 這裡的CHTs不知道是什麼東西。
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating // any partially completed operations (in case of a reorg). // Reset 方法用來初始化一個新的區塊鏈片段,可能會終止任何沒有完成的操作。 Reset(section uint64) // Process crunches through the next header in the chain segment. The caller // will ensure a sequential order of headers. // 對區塊鏈片段中的下一個區塊頭進行處理。 呼叫者將確保區塊頭的連續順序。 Process(header *types.Header) // Commit finalizes the section metadata and stores it into the database. 完成區塊鏈片段的元資料並將其儲存到資料庫中。 Commit() error
}
// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
// ChainEventLoop in a goroutine.
// ChainIndexer 對區塊鏈進行 大小相等的片段 進行處。 ChainIndexer在ChainEventLoop方法中通過事件系統與區塊鏈通訊,
// Further child ChainIndexers can be added which use the output of the parent
// section indexer. These child indexers receive new head notifications only
// after an entire section has been finished or in case of rollbacks that might
// affect already finished sections.
//更遠可以新增使用父section索引器的輸出的更多子鏈式索引器。 這些子鏈式索引器只有在整個部分完成後或在可能影響已完成部分的回滾的情況下才接收新的頭部通知。
type ChainIndexer struct {
chainDb ethdb.Database // Chain database to index the data from 區塊鏈所在的資料庫 indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into 索引儲存的資料庫 backend ChainIndexerBackend // Background processor generating the index data content 索引生成的後端。 children []*ChainIndexer // Child indexers to cascade chain updates to子索引 active uint32 // Flag whether the event loop was started update chan struct{} // Notification channel that headers should be processed 接收到的headers quit chan chan error // Quit channel to tear down running goroutines sectionSize uint64 // Number of blocks in a single chain segment to process section的大小。 預設是4096個區塊為一個section confirmsReq uint64 // Number of confirmations before processing a completed segment 處理完成的段之前的確認次數 storedSections uint64 // Number of sections successfully indexed into the database 成功索引到資料庫的部分數量 knownSections uint64 // Number of sections known to be complete (block wise) 已知完成的部分數量 cascadedHead uint64 // Block number of the last completed section cascaded to subindexers 級聯到子索引的最後一個完成部分的塊號 throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources 磁碟限制,以防止大量資源的大量升級 log log.Logger lock sync.RWMutex
}
建構函式NewChainIndexer,
這個方法是在eth/bloombits.go裡面被呼叫的
const (
// bloomConfirms is the number of confirmation blocks before a bloom section is // considered probably final and its rotated bits are calculated. // bloomConfirms 用來表示確認區塊數量, 表示經過這麼多區塊之後, bloom section被認為是已經不會更改了。 bloomConfirms = 256 // bloomThrottling is the time to wait between processing two consecutive index // sections. It's useful during chain upgrades to prevent disk overload. // bloomThrottling是處理兩個連續索引段之間的等待時間。 在區塊鏈升級過程中防止磁碟過載是很有用的。 bloomThrottling = 100 * time.Millisecond
)
func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
backend := &BloomIndexer{ db: db, size: size, } // 可以看到indexDb和chainDb實際是同一個資料庫, 但是indexDb的每個key前面附加了一個BloomBitsIndexPrefix的字首。 table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix)) return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits")
}
// NewChainIndexer creates a new chain indexer to do background processing on
// chain segments of a given size after certain number of confirmations passed.
// The throttling parameter might be used to prevent database thrashing.
func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
c := &ChainIndexer{ chainDb: chainDb, indexDb: indexDb, backend: backend, update: make(chan struct{}, 1), quit: make(chan chan error), sectionSize: section, confirmsReq: confirm, throttling: throttling, log: log.New("type", kind), } // Initialize database dependent fields and start the updater c.loadValidSections() go c.updateLoop() return c
}
loadValidSections,用來從資料庫裡面載入我們之前的處理資訊, storedSections表示我們已經處理到哪裡了。
// loadValidSections reads the number of valid sections from the index database
// and caches is into the local state.
func (c *ChainIndexer) loadValidSections() {
data, _ := c.indexDb.Get([]byte("count")) if len(data) == 8 { c.storedSections = binary.BigEndian.Uint64(data[:]) }
}
updateLoop,是主要的事件迴圈,用於呼叫backend來處理區塊鏈section,這個需要注意的是,所有的主索引節點和所有的 child indexer 都會啟動這個goroutine 方法。
func (c *ChainIndexer) updateLoop() {
var ( updating bool updated time.Time ) for { select { case errc :=c.storedSections { // 如果當前以知的Section 大於已經儲存的Section // Periodically print an upgrade log message to the user // 每隔8秒列印一次日誌資訊。 if time.Since(updated) > 8*time.Second { if c.knownSections > c.storedSections+1 { updating = true c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) } updated = time.Now() } // Cache the current section count and head to allow unlocking the mutex section := c.storedSections var oldHead common.Hash if section > 0 { // section - 1 代表section的下標是從0開始的。 // sectionHead用來獲取section的最後一個區塊的hash值。 oldHead = c.sectionHead(section - 1) } // Process the newly defined section in the background c.lock.Unlock() // 處理 返回新的section的最後一個區塊的hash值 newHead, err := c.processSection(section, oldHead) if err != nil { c.log.Error("Section processing failed", "error", err) } c.lock.Lock() // If processing succeeded and no reorgs occcurred, mark the section completed if err == nil && oldHead == c.sectionHead(section-1) { c.setSectionHead(section, newHead) // 更新資料庫的狀態 c.setValidSections(section + 1) // 更新資料庫狀態 if c.storedSections == c.knownSections && updating { updating = false c.log.Info("Finished upgrading chain index") } // cascadedHead 是更新後的section的最後一個區塊的高度 // 用法是什麼 ? c.cascadedHead = c.storedSections*c.sectionSize - 1 for _, child := range c.children { c.log.Trace("Cascading chain index update", "head", c.cascadedHead) child.newHead(c.cascadedHead, false) } } else { //如果處理失敗,那麼在有新的通知之前不會重試。 // If processing failed, don't retry until further notification c.log.Debug("Chain index processing failed", "section", section, "err", err) c.knownSections = c.storedSections } } // If there are still further sections to process, reschedule // 如果還有section等待處理,那麼等待throttling時間再處理。避免磁碟過載。 if c.knownSections > c.storedSections { time.AfterFunc(c.throttling, func() { select { case c.update <- struct{}{}: default: } }) } c.lock.Unlock() } }
}
未完待續…