Fabric 1.0原始碼分析(2) blockfile(區塊檔案儲存)
Fabric 1.0原始碼筆記 之 blockfile(區塊檔案儲存)
1、blockfile概述
blockfile,即Fabric區塊鏈區塊檔案儲存,預設目錄/var/hyperledger/production/ledgersData/chains,含index和chains兩個子目錄。 其中index為索引目錄,採用leveldb實現。而chains為各ledger的區塊鏈檔案,子目錄以ledgerid為名,使用檔案系統實現。 區塊檔案以blockfile_為字首,最大大小預設64M。
blockfile,相關程式碼集中在common/ledger/blkstorage/fsblkstorage目錄,目錄結構如下:
- blockfile_mgr.go,blockfileMgr和checkpointInfo結構體及方法。
- block_stream.go,blockfileStream、blockStream、blockPlacementInfo結構體及方法。
- blockfile_rw.go,blockfileWriter和blockfileReader結構體及方法(blockfileReader未使用)。
- blockindex.go,index介面定義,index介面實現即blockIndex結構體及方法定義,以及blockIdxInfo、locPointer、fileLocPointer結構體及方法。
- blockfile_helper.go,定義了4個工具函式,constructCheckpointInfoFromBlockFiles、retrieveLastFileSuffix、isBlockFileName、getFileInfoOrPanic。 作用分別為:掃描最新的blockfile並重新構造檢查點資訊、獲取最新的檔案字尾、根據檔案字首判斷是否為區塊檔案、獲取檔案狀態資訊。
- block_serialization.go,block序列化相關工具函式。
- blocks_itr.go,blocksItr結構體及方法。
2、Block結構體定、以及Block序列化
2.1、Block相關結構體
Block結構體:
type Block struct {
Header *BlockHeader //BlockHeader
Data *BlockData //BlockData
Metadata *BlockMetadata
}
func (m *Block) GetHeader() *BlockHeader //獲取BlockHeader,即m.Header
func (m *Block) GetData() *BlockData //獲取BlockData,即m.Data
func (m *Block) GetMetadata() *BlockMetadata //m.Metadata
//程式碼在protos/common/common.pb.go
BlockHeader結構體:
type BlockHeader struct {
Number uint64 //區塊編號
PreviousHash []byte //前一個區塊雜湊
DataHash []byte //當前區塊雜湊
}
func (m *BlockHeader) GetNumber() uint64 //獲取區塊編號,即m.Number
func (m *BlockHeader) GetPreviousHash() []byte //獲取前一個區塊雜湊,即m.PreviousHash
func (m *BlockHeader) GetDataHash() []byte //獲取當前區塊雜湊,即m.DataHash
//程式碼在protos/common/common.pb.go
BlockData結構體:
type BlockData struct {
Data [][]byte //Data,儲存交易資訊
}
func (m *BlockData) GetData() [][]byte //獲取Data,即m.Data
//程式碼在protos/common/common.pb.go
BlockMetadata結構體:
type BlockMetadata struct {
Metadata [][]byte //K/V均為[]byte格式
}
func (m *BlockMetadata) GetMetadata() [][]byte //m.Metadata
//程式碼在protos/common/common.pb.go
補充BlockMetadataIndex:
type BlockMetadataIndex int32
const (
BlockMetadataIndex_SIGNATURES BlockMetadataIndex = 0
BlockMetadataIndex_LAST_CONFIG BlockMetadataIndex = 1
BlockMetadataIndex_TRANSACTIONS_FILTER BlockMetadataIndex = 2
BlockMetadataIndex_ORDERER BlockMetadataIndex = 3
)
2.2、Block序列化
serializedBlockInfo結構體定義及工具函式:
type serializedBlockInfo struct {
blockHeader *common.BlockHeader //BlockHeader
txOffsets []*txindexInfo //交易索引資訊
metadata *common.BlockMetadata
}
type txindexInfo struct {
txID string //交易ID
loc *locPointer //檔案指標
}
//序列化區塊,返回序列化後位元組,以及serializedBlockInfo(含BlockHeader和交易索引資訊)
func serializeBlock(block *common.Block) ([]byte, *serializedBlockInfo, error)
//反序列化區塊,構建Block結構體
func deserializeBlock(serializedBlockBytes []byte) (*common.Block, error)
//反序列化區塊,並構造serializedBlockInfo
func extractSerializedBlockInfo(serializedBlockBytes []byte) (*serializedBlockInfo, error)
//序列化中新增BlockHeader,即Number、DataHash和PreviousHash
func addHeaderBytes(blockHeader *common.BlockHeader, buf *proto.Buffer) error
//序列化中新增BlockData,並從BlockData中解析txid,返回交易索引資訊陣列
func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) ([]*txindexInfo, error)
//序列化中新增Metadata
func addMetadataBytes(blockMetadata *common.BlockMetadata, buf *proto.Buffer) error
//反序列化出BlockHeader
func extractHeader(buf *ledgerutil.Buffer) (*common.BlockHeader, error)
//反序列化出BlockData,並返回交易索引資訊陣列
func extractData(buf *ledgerutil.Buffer) (*common.BlockData, []*txindexInfo, error)
//反序列化出Metadata
func extractMetadata(buf *ledgerutil.Buffer) (*common.BlockMetadata, error)
//從BlockData中解析出交易ID
func extractTxID(txEnvelopBytes []byte) (string, error)
//程式碼在common/ledger/blkstorage/fsblkstorage/block_serialization.go
3、checkpointInfo結構體定義及方法
checkpointInfo,即檢查點資訊,結構體定義如下:
type checkpointInfo struct {
latestFileChunkSuffixNum int //最新的區塊檔案字尾,如blockfile_000000
latestFileChunksize int //最新的區塊檔案大小
isChainEmpty bool //是否空鏈
lastBlockNumber uint64 //最新的區塊編號
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
涉及方法如下:
func (i *checkpointInfo) marshal() ([]byte, error) //checkpointInfo序列化
func (i *checkpointInfo) unmarshal(b []byte) error //checkpointInfo反序列化
func (i *checkpointInfo) String() string //轉換為string
//程式碼在common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
4、blockfileStream相關結構體及方法
4.1、blockfileStream
blockfileStream定義如下:
type blockfileStream struct {
fileNum int //blockfile檔案字尾
file *os.File //os.File
reader *bufio.Reader //bufio.Reader
currentOffset int64 //當前偏移量
}
//程式碼在common/ledger/blkstorage/fsblkstorage/block_stream.go
涉及方法如下:
//構造blockfileStream
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error)
func (s *blockfileStream) nextBlockBytes() ([]byte, error) //下一個塊,調取s.nextBlockBytesAndPlacementInfo()
//下一個塊和位置資訊
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error)
func (s *blockfileStream) close() error //關閉blockfileStream
//程式碼在common/ledger/blkstorage/fsblkstorage/block_stream.go
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) 程式碼如下:
var lenBytes []byte
var err error
var fileInfo os.FileInfo
moreContentAvailable := true
fileInfo, err = s.file.Stat() //獲取檔案狀態
remainingBytes := fileInfo.Size() - s.currentOffset //檔案讀取剩餘位元組
peekBytes := 8
if remainingBytes < int64(peekBytes) { //剩餘位元組小於8,按實際剩餘位元組,否則按8
peekBytes = int(remainingBytes)
moreContentAvailable = false
}
//儲存形式:前n位儲存block長度length,之後length位為實際block
lenBytes, err = s.reader.Peek(peekBytes) //Peek 返回快取的一個切片,該切片引用快取中前 peekBytes 個位元組的資料
length, n := proto.DecodeVarint(lenBytes) //從切片中讀取 varint 編碼的整數,它返回整數和被消耗的位元組數。
err = s.reader.Discard(n) //丟棄儲存block長度length的前n位
blockBytes := make([]byte, length)
_, err = io.ReadAtLeast(s.reader, blockBytes, int(length))
blockPlacementInfo := &blockPlacementInfo{
fileNum: s.fileNum,
blockStartOffset: s.currentOffset,
blockBytesOffset: s.currentOffset + int64(n)}
s.currentOffset += int64(n) + int64(length)
return blockBytes, blockPlacementInfo, nil
//程式碼在common/ledger/blkstorage/fsblkstorage/block_stream.go
補充blockPlacementInfo:塊位置資訊
type blockPlacementInfo struct {
fileNum int //塊檔案字尾
blockStartOffset int64 //n+length,n之前
blockBytesOffset int64 //n+length,length之前
}
//程式碼在common/ledger/blkstorage/fsblkstorage/block_stream.go
5、blockfileWriter結構體定義及方法
type blockfileWriter struct {
filePath string //路徑
file *os.File //os.File
}
func newBlockfileWriter(filePath string) (*blockfileWriter, error) //構造blockfileWriter,並呼叫writer.open()
func (w *blockfileWriter) truncateFile(targetSize int) error //擷取檔案
func (w *blockfileWriter) append(b []byte, sync bool) error //追加檔案
func (w *blockfileWriter) open() error //開啟檔案
func (w *blockfileWriter) close() error //關閉檔案
//程式碼在common/ledger/blkstorage/fsblkstorage/blockfile_rw.go
6、blockIndex相關結構體及方法
6.1、index介面定義
type index interface {
getLastBlockIndexed() (uint64, error) //獲取最後一個塊索引(或編號)
indexBlock(blockIdxInfo *blockIdxInfo) error //索引區塊
getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) //根據區塊雜湊,獲取檔案區塊指標
getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) //根據區塊編號,獲取檔案區塊指標
getTxLoc(txID string) (*fileLocPointer, error) //根據交易ID,獲取檔案交易指標
getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) //根據區塊編號和交易編號,獲取檔案交易指標
getBlockLocByTxID(txID string) (*fileLocPointer, error)//根據交易ID,獲取檔案區塊指標
getTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)//根據交易ID,獲取交易驗證程式碼
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go
6.2、blockIndex結構體
blockIndex結構體定義如下:
type blockIndex struct {
indexItemsMap map[blkstorage.IndexableAttr]bool //index屬性對映
db *leveldbhelper.DBHandle //index leveldb操作
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go
補充IndexableAttr:
const (
IndexableAttrBlockNum = IndexableAttr("BlockNum")
IndexableAttrBlockHash = IndexableAttr("BlockHash")
IndexableAttrTxID = IndexableAttr("TxID")
IndexableAttrBlockNumTranNum = IndexableAttr("BlockNumTranNum")
IndexableAttrBlockTxID = IndexableAttr("BlockTxID")
IndexableAttrTxValidationCode = IndexableAttr("TxValidationCode")
)
//程式碼在common/ledger/blkstorage/blockstorage.go
涉及方法如下:
//構造blockIndex
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) *blockIndex
//獲取最後一個塊索引(或編號),取key為"indexCheckpointKey"的值,即為最新的區塊編號
func (index *blockIndex) getLastBlockIndexed() (uint64, error)
func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error //索引區塊
//根據區塊雜湊,獲取檔案區塊指標
func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error)
//根據區塊編號,獲取檔案區塊指標
func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error)
//根據交易ID,獲取檔案交易指標
func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error)
//根據交易ID,獲取檔案區塊指標
func (index *blockIndex) getBlockLocByTxID(txID string) (*fileLocPointer, error)
//根據區塊編號和交易編號,獲取檔案交易指標
func (index *blockIndex) getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error)
//根據交易ID,獲取交易驗證程式碼
func (index *blockIndex) getTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go
補充blockIdxInfo結構體定義:塊索引資訊。
type blockIdxInfo struct {
blockNum uint64 //區塊編號
blockHash []byte //區塊雜湊
flp *fileLocPointer //檔案指標
txOffsets []*txindexInfo //交易索引資訊
metadata *common.BlockMetadata
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go
補充fileLocPointer、txindexInfo和common.BlockMetadata:
type locPointer struct { //定義指標
offset int //偏移位置
bytesLength int //位元組長度
}
type fileLocPointer struct { //定義檔案指標
fileSuffixNum int //檔案字尾
locPointer //嵌入locPointer
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go
type txindexInfo struct { //交易索引資訊
txID string //交易ID
loc *locPointer //檔案指標
}
//程式碼在common/ledger/blkstorage/fsblkstorage/block_serialization.go
type BlockMetadata struct {
Metadata [][]byte `protobuf:"bytes,1,rep,name=metadata,proto3" json:"metadata,omitempty"`
}
//程式碼在protos/common/common.pb.go
func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error程式碼如下:
flp := blockIdxInfo.flp //檔案指標
txOffsets := blockIdxInfo.txOffsets //交易索引資訊
txsfltr := ledgerUtil.TxValidationFlags(blockIdxInfo.metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) //type TxValidationFlags []uint8
batch := leveldbhelper.NewUpdateBatch() //leveldb批量更新
flpBytes, err := flp.marshal() //檔案指標序列化,含檔案字尾、偏移位置、位元組長度
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok { //使用區塊雜湊索引檔案區塊指標
batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes) //區塊雜湊,blockHash:flpBytes存入leveldb
}
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok { //使用區塊編號索引檔案區塊指標
batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes) //區塊編號,blockNum:flpBytes存入leveldb
}
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok { //使用交易ID索引檔案交易指標
for _, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
txFlpBytes, marshalErr := txFlp.marshal()
batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes) //交易ID,txID:txFlpBytes存入leveldb
}
}
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok { //使用區塊編號和交易編號索引檔案交易指標
for txIterator, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
txFlpBytes, marshalErr := txFlp.marshal()
batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator)), txFlpBytes) //區塊編號和交易編號,blockNum+txIterator:txFlpBytes
}
}
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; ok { //使用交易ID索引檔案區塊指標
for _, txoffset := range txOffsets {
batch.Put(constructBlockTxIDKey(txoffset.txID), flpBytes) //交易ID,txID:flpBytes
}
}
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxValidationCode]; ok { //使用交易ID索引交易驗證程式碼
for idx, txoffset := range txOffsets {
batch.Put(constructTxValidationCodeIDKey(txoffset.txID), []byte{byte(txsfltr.Flag(idx))})
}
}
batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum)) //key為"indexCheckpointKey"的值,即為最新的區塊編號
err := index.db.WriteBatch(batch, true) //批量更新
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockindex.go
7、blocksItr結構體及方法
type blocksItr struct {
mgr *blockfileMgr //blockfileMgr
maxBlockNumAvailable uint64 //最大的區塊編號
blockNumToRetrieve uint64 //起始區塊編號
stream *blockStream //blockStream
closeMarker bool
closeMarkerLock *sync.Mutex
}
func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *blocksItr //構造blocksItr
func (itr *blocksItr) waitForBlock(blockNum uint64) uint64
func (itr *blocksItr) initStream() error
func (itr *blocksItr) shouldClose() bool
func (itr *blocksItr) Next() (ledger.QueryResult, error)
func (itr *blocksItr) Close()
//程式碼在common/ledger/blkstorage/fsblkstorage/blocks_itr.go
8、blockfileMgr結構體定義及方法
blockfileMgr結構體定義:
type blockfileMgr struct {
rootDir string //ledger檔案儲存目錄,如/var/hyperledger/production/ledgersData/chains/chains/mychannel
conf *Conf //即type Conf struct,存放路徑和區塊檔案大小
db *leveldbhelper.DBHandle //用於操作index
index index //type index interface,其實現為blockIndex結構體
cpInfo *checkpointInfo //type checkpointInfo struct
cpInfoCond *sync.Cond //定期喚醒鎖
currentFileWriter *blockfileWriter //type blockfileWriter struct
bcInfo atomic.Value //原子操作
}
//程式碼在common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
涉及方法如下:
//構建blockfileMgr
func newBlockfileMgr