1. 程式人生 > >Fabric 1.0原始碼分析(2) blockfile(區塊檔案儲存)

Fabric 1.0原始碼分析(2) blockfile(區塊檔案儲存)

Fabric 1.0原始碼筆記 之 blockfile(區塊檔案儲存)

1、blockfile概述

blockfile,即Fabric區塊鏈區塊檔案儲存,預設目錄/var/hyperledger/production/ledgersData/chains,含index和chains兩個子目錄。 其中index為索引目錄,採用leveldb實現。而chains為各ledger的區塊鏈檔案,子目錄以ledgerid為名,使用檔案系統實現。 區塊檔案以blockfile_為字首,最大大小預設64M。

blockfile,相關程式碼集中在common/ledger/blkstorage/fsblkstorage目錄,目錄結構如下:

  • blockfile_mgr.go,blockfileMgr和checkpointInfo結構體及方法。
  • block_stream.go,blockfileStream、blockStream、blockPlacementInfo結構體及方法。
  • blockfile_rw.go,blockfileWriter和blockfileReader結構體及方法(blockfileReader未使用)。
  • blockindex.go,index介面定義,index介面實現即blockIndex結構體及方法定義,以及blockIdxInfo、locPointer、fileLocPointer結構體及方法。
  • blockfile_helper.go,定義了4個工具函式,constructCheckpointInfoFromBlockFiles、retrieveLastFileSuffix、isBlockFileName、getFileInfoOrPanic。 作用分別為:掃描最新的blockfile並重新構造檢查點資訊、獲取最新的檔案字尾、根據檔案字首判斷是否為區塊檔案、獲取檔案狀態資訊。
  • block_serialization.go,block序列化相關工具函式。
  • blocks_itr.go,blocksItr結構體及方法。

2、Block結構體定、以及Block序列化

2.1、Block相關結構體

Block結構體:

type Block struct {
    Header   *BlockHeader //BlockHeader
    Data     *BlockData //BlockData
    Metadata *BlockMetadata
}

func (m *Block) GetHeader() *BlockHeader //獲取BlockHeader,即m.Header
func (m *Block) GetData() *BlockData //獲取BlockData,即m.Data func (m *Block) GetMetadata() *BlockMetadata //m.Metadata //程式碼在protos/common/common.pb.go

BlockHeader結構體:

type BlockHeader struct {
    Number       uint64 //區塊編號
    PreviousHash []byte //前一個區塊雜湊
    DataHash     []byte //當前區塊雜湊
}

func (m *BlockHeader) GetNumber() uint64 //獲取區塊編號,即m.Number
func (m *BlockHeader) GetPreviousHash() []byte //獲取前一個區塊雜湊,即m.PreviousHash
func (m *BlockHeader) GetDataHash() []byte //獲取當前區塊雜湊,即m.DataHash
//程式碼在protos/common/common.pb.go

BlockData結構體:

type BlockData struct {
    Data [][]byte //Data,儲存交易資訊
}

func (m *BlockData) GetData() [][]byte //獲取Data,即m.Data
//程式碼在protos/common/common.pb.go

BlockMetadata結構體:

type BlockMetadata struct {
    Metadata [][]byte //K/V均為[]byte格式
}

func (m *BlockMetadata) GetMetadata() [][]byte //m.Metadata
//程式碼在protos/common/common.pb.go

補充BlockMetadataIndex:

type BlockMetadataIndex int32

const (
    BlockMetadataIndex_SIGNATURES          BlockMetadataIndex = 0
    BlockMetadataIndex_LAST_CONFIG         BlockMetadataIndex = 1
    BlockMetadataIndex_TRANSACTIONS_FILTER BlockMetadataIndex = 2
    BlockMetadataIndex_ORDERER             BlockMetadataIndex = 3
)

2.2、Block序列化

serializedBlockInfo結構體定義及工具函式:

type serializedBlockInfo struct {
    blockHeader *common.BlockHeader //BlockHeader
    txOffsets   []*txindexInfo //交易索引資訊
    metadata    *common.BlockMetadata
}

type txindexInfo struct {
    txID string //交易ID
    loc  *locPointer //檔案指標
}

//序列化區塊,返回序列化後位元組,以及serializedBlockInfo(含BlockHeader和交易索引資訊)
func serializeBlock(block *common.Block) ([]byte, *serializedBlockInfo, error)
//反序列化區塊,構建Block結構體
func deserializeBlock(serializedBlockBytes []byte) (*common.Block, error)
//反序列化區塊,並構造serializedBlockInfo
func extractSerializedBlockInfo(serializedBlockBytes []byte) (*serializedBlockInfo, error)
//序列化中新增BlockHeader,即Number、DataHash和PreviousHash
func addHeaderBytes(blockHeader *common.BlockHeader, buf *proto.Buffer) error
//序列化中新增BlockData,並從BlockData中解析txid,返回交易索引資訊陣列
func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) ([]*txindexInfo, error)
//序列化中新增Metadata
func addMetadataBytes(blockMetadata *common.BlockMetadata, buf *proto.Buffer) error
//反序列化出BlockHeader
func extractHeader(buf *ledgerutil.Buffer) (*common.BlockHeader, error)
//反序列化出BlockData,並返回交易索引資訊陣列
func extractData(buf *ledgerutil.Buffer) (*common.BlockData, []*txindexInfo, error)
//反序列化出Metadata
func extractMetadata(buf *ledgerutil.Buffer) (*common.BlockMetadata, error)
//從BlockData中解析出交易ID
func extractTxID(txEnvelopBytes []byte) (string, error)
//程式碼在common/ledger/blkstorage/fsblkstorage/block_serialization.go

3、checkpointInfo結構體定義及方法

checkpointInfo,即檢查點資訊,結構體定義如下:

type checkpointInfo struct {
    latestFileChunkSuffixNum int //最新的區塊檔案字尾,如blockfile_000000
    latestFileChunksize      int //最新的區塊檔案大小
    isChainEmpty             bool //是否空鏈
    lastBlockNumber          uint64 //最新的區塊編號
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go

涉及方法如下:

func (i *checkpointInfo) marshal() ([]byte, error) //checkpointInfo序列化
func (i *checkpointInfo) unmarshal(b []byte) error //checkpointInfo反序列化
func (i *checkpointInfo) String() string //轉換為string
//程式碼在common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go

4、blockfileStream相關結構體及方法

4.1、blockfileStream

blockfileStream定義如下:

type blockfileStream struct {
    fileNum       int //blockfile檔案字尾
    file          *os.File //os.File
    reader        *bufio.Reader //bufio.Reader
    currentOffset int64 //當前偏移量
}
//程式碼在common/ledger/blkstorage/fsblkstorage/block_stream.go

涉及方法如下:

//構造blockfileStream
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) 
func (s *blockfileStream) nextBlockBytes() ([]byte, error) //下一個塊,調取s.nextBlockBytesAndPlacementInfo()
//下一個塊和位置資訊
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) 
func (s *blockfileStream) close() error //關閉blockfileStream
//程式碼在common/ledger/blkstorage/fsblkstorage/block_stream.go

func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) 程式碼如下:

var lenBytes []byte
var err error
var fileInfo os.FileInfo
moreContentAvailable := true

fileInfo, err = s.file.Stat() //獲取檔案狀態
remainingBytes := fileInfo.Size() - s.currentOffset //檔案讀取剩餘位元組
peekBytes := 8
if remainingBytes < int64(peekBytes) { //剩餘位元組小於8,按實際剩餘位元組,否則按8
    peekBytes = int(remainingBytes)
    moreContentAvailable = false
}
//儲存形式:前n位儲存block長度length,之後length位為實際block
lenBytes, err = s.reader.Peek(peekBytes) //Peek 返回快取的一個切片,該切片引用快取中前 peekBytes 個位元組的資料
length, n := proto.DecodeVarint(lenBytes) //從切片中讀取 varint 編碼的整數,它返回整數和被消耗的位元組數。
    err = s.reader.Discard(n) //丟棄儲存block長度length的前n位
    blockBytes := make([]byte, length)
    _, err = io.ReadAtLeast(s.reader, blockBytes, int(length))
    blockPlacementInfo := &blockPlacementInfo{
        fileNum:          s.fileNum,
        blockStartOffset: s.currentOffset,
        blockBytesOffset: s.currentOffset + int64(n)}
    s.currentOffset += int64(n) + int64(length)
    return blockBytes, blockPlacementInfo, nil
//程式碼在common/ledger/blkstorage/fsblkstorage/block_stream.go

補充blockPlacementInfo:塊位置資訊

type blockPlacementInfo struct {
    fileNum          int //塊檔案字尾
    blockStartOffset int64 //n+length,n之前
    blockBytesOffset int64 //n+length,length之前
}
//程式碼在common/ledger/blkstorage/fsblkstorage/block_stream.go

5、blockfileWriter結構體定義及方法

type blockfileWriter struct {
    filePath string //路徑
    file     *os.File //os.File
}

func newBlockfileWriter(filePath string) (*blockfileWriter, error) //構造blockfileWriter,並呼叫writer.open()
func (w *blockfileWriter) truncateFile(targetSize int) error //擷取檔案
func (w *blockfileWriter) append(b []byte, sync bool) error //追加檔案
func (w *blockfileWriter) open() error //開啟檔案
func (w *blockfileWriter) close() error //關閉檔案
//程式碼在common/ledger/blkstorage/fsblkstorage/blockfile_rw.go

6、blockIndex相關結構體及方法

6.1、index介面定義

type index interface {
    getLastBlockIndexed() (uint64, error) //獲取最後一個塊索引(或編號)
    indexBlock(blockIdxInfo *blockIdxInfo) error //索引區塊
    getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) //根據區塊雜湊,獲取檔案區塊指標
    getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) //根據區塊編號,獲取檔案區塊指標
    getTxLoc(txID string) (*fileLocPointer, error) //根據交易ID,獲取檔案交易指標
    getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) //根據區塊編號和交易編號,獲取檔案交易指標
    getBlockLocByTxID(txID string) (*fileLocPointer, error)//根據交易ID,獲取檔案區塊指標
    getTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)//根據交易ID,獲取交易驗證程式碼
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go

6.2、blockIndex結構體

blockIndex結構體定義如下:

type blockIndex struct {
    indexItemsMap map[blkstorage.IndexableAttr]bool //index屬性對映
    db            *leveldbhelper.DBHandle //index leveldb操作
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go

補充IndexableAttr:

const (
    IndexableAttrBlockNum         = IndexableAttr("BlockNum")
    IndexableAttrBlockHash        = IndexableAttr("BlockHash")
    IndexableAttrTxID             = IndexableAttr("TxID")
    IndexableAttrBlockNumTranNum  = IndexableAttr("BlockNumTranNum")
    IndexableAttrBlockTxID        = IndexableAttr("BlockTxID")
    IndexableAttrTxValidationCode = IndexableAttr("TxValidationCode")
)
//程式碼在common/ledger/blkstorage/blockstorage.go

涉及方法如下:

//構造blockIndex
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) *blockIndex 
//獲取最後一個塊索引(或編號),取key為"indexCheckpointKey"的值,即為最新的區塊編號
func (index *blockIndex) getLastBlockIndexed() (uint64, error) 
func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error //索引區塊
//根據區塊雜湊,獲取檔案區塊指標
func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) 
//根據區塊編號,獲取檔案區塊指標
func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) 
//根據交易ID,獲取檔案交易指標
func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) 
//根據交易ID,獲取檔案區塊指標
func (index *blockIndex) getBlockLocByTxID(txID string) (*fileLocPointer, error) 
//根據區塊編號和交易編號,獲取檔案交易指標
func (index *blockIndex) getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) 
//根據交易ID,獲取交易驗證程式碼
func (index *blockIndex) getTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) 
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go

補充blockIdxInfo結構體定義:塊索引資訊。

type blockIdxInfo struct {
    blockNum  uint64 //區塊編號
    blockHash []byte //區塊雜湊
    flp       *fileLocPointer //檔案指標
    txOffsets []*txindexInfo //交易索引資訊
    metadata  *common.BlockMetadata 
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go

補充fileLocPointer、txindexInfo和common.BlockMetadata:

type locPointer struct { //定義指標
    offset      int //偏移位置
    bytesLength int //位元組長度
}
type fileLocPointer struct { //定義檔案指標
    fileSuffixNum int //檔案字尾
    locPointer //嵌入locPointer
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go

type txindexInfo struct { //交易索引資訊
    txID string //交易ID
    loc  *locPointer //檔案指標
}
//程式碼在common/ledger/blkstorage/fsblkstorage/block_serialization.go

type BlockMetadata struct {
    Metadata [][]byte `protobuf:"bytes,1,rep,name=metadata,proto3" json:"metadata,omitempty"`
}
//程式碼在protos/common/common.pb.go

func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error程式碼如下:

flp := blockIdxInfo.flp //檔案指標
txOffsets := blockIdxInfo.txOffsets //交易索引資訊
txsfltr := ledgerUtil.TxValidationFlags(blockIdxInfo.metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) //type TxValidationFlags []uint8
batch := leveldbhelper.NewUpdateBatch() //leveldb批量更新
flpBytes, err := flp.marshal() //檔案指標序列化,含檔案字尾、偏移位置、位元組長度

if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok { //使用區塊雜湊索引檔案區塊指標
    batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes) //區塊雜湊,blockHash:flpBytes存入leveldb
}

if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok { //使用區塊編號索引檔案區塊指標
    batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes) //區塊編號,blockNum:flpBytes存入leveldb
}

if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok { //使用交易ID索引檔案交易指標
    for _, txoffset := range txOffsets {
        txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
        txFlpBytes, marshalErr := txFlp.marshal()
        batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes) //交易ID,txID:txFlpBytes存入leveldb
    }
}

if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok { //使用區塊編號和交易編號索引檔案交易指標
    for txIterator, txoffset := range txOffsets {
        txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
        txFlpBytes, marshalErr := txFlp.marshal()
        batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator)), txFlpBytes) //區塊編號和交易編號,blockNum+txIterator:txFlpBytes
    }
}

if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; ok { //使用交易ID索引檔案區塊指標
    for _, txoffset := range txOffsets {
        batch.Put(constructBlockTxIDKey(txoffset.txID), flpBytes) //交易ID,txID:flpBytes
    }
}

if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxValidationCode]; ok { //使用交易ID索引交易驗證程式碼
    for idx, txoffset := range txOffsets {
        batch.Put(constructTxValidationCodeIDKey(txoffset.txID), []byte{byte(txsfltr.Flag(idx))})
    }
}

batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum)) //key為"indexCheckpointKey"的值,即為最新的區塊編號
err := index.db.WriteBatch(batch, true) //批量更新
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go

7、blocksItr結構體及方法

type blocksItr struct {
    mgr                  *blockfileMgr //blockfileMgr
    maxBlockNumAvailable uint64 //最大的區塊編號
    blockNumToRetrieve   uint64 //起始區塊編號
    stream               *blockStream //blockStream
    closeMarker          bool
    closeMarkerLock      *sync.Mutex
}

func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *blocksItr //構造blocksItr
func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 
func (itr *blocksItr) initStream() error 
func (itr *blocksItr) shouldClose() bool 
func (itr *blocksItr) Next() (ledger.QueryResult, error) 
func (itr *blocksItr) Close() 
//程式碼在common/ledger/blkstorage/fsblkstorage/blocks_itr.go

8、blockfileMgr結構體定義及方法

blockfileMgr結構體定義:

type blockfileMgr struct {
    rootDir           string //ledger檔案儲存目錄,如/var/hyperledger/production/ledgersData/chains/chains/mychannel
    conf              *Conf //即type Conf struct,存放路徑和區塊檔案大小
    db                *leveldbhelper.DBHandle //用於操作index
    index             index //type index interface,其實現為blockIndex結構體
    cpInfo            *checkpointInfo //type checkpointInfo struct
    cpInfoCond        *sync.Cond //定期喚醒鎖
    currentFileWriter *blockfileWriter //type blockfileWriter struct
    bcInfo            atomic.Value //原子操作
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go

涉及方法如下:

//構建blockfileMgr
func newBlockfileMgr