1. 程式人生 > >超級賬本hyperledger fabric第五集:共識排序及原始碼閱讀

超級賬本hyperledger fabric第五集:共識排序及原始碼閱讀

一.共識機制

達成共識需要3個階段,交易背書,交易排序,交易驗證

  • 交易背書:模擬的
  • 交易排序:確定交易順序,最終將排序好的交易打包區塊分發
  • 交易驗證:區塊儲存前要進行一下交易驗證

二.orderer節點的作用

  • 交易排序
    1. 目的:保證系統的最終一致性(有限狀態機)
    2. solo:單節點排序
    3. kafka:外接的分散式訊息佇列
  • 區塊分發
    1. orderer中的區塊並不是最終持久化的區塊
    2. 是一箇中間狀態的區塊
    3. 包含了所有交易,不管是有效還是無效,都會打包傳給組織的錨節點
  • 多通道的資料隔離
    1. 客戶端可以使用某個通道,傳送交易

三.原始碼目錄

  • 從goland中閱讀

  • 原始碼目錄
    1. bccsp:與密碼學相關的,加密,數字簽名,證書,將密碼學中的函式抽象成了介面,方便呼叫和擴充套件
    2. bddtests:行為驅動開發,從需求直接到開發
    3. common:公共庫、錯誤處理、日誌出項,賬本儲存,相關工具
    4. core:是fabric的核心庫,子目錄是各個模組的目錄 /  comm:網路通訊相關
    5. devenv:官方提供的開發環境,使用的是Vagrant
    6. docs:文件
    7. events:事件監聽機制
    8. examples:例子程式
    9. gossip:通訊協議,組織內部的通訊,區塊同步
    10. gotools:用於編譯
    11. images:docker映象相關
    12. msp:成員服務管理,member serivce provider,讀取證書做簽名
    13. orderer:排序節點
    14. peer:peer節點
    15. proposals:用於擴充套件,新功能的提案
    16. protos:資料結構的定義

四.共識機制原始碼

orderer節點的原始碼

  • 首先看orderer目錄下的main.go ,main.go裡有一個NewServer可以進入server.go

main.go中func main() 主要起到判斷作用,如果接收到的是start命令,就載入和初始化各種配置,如果接收到的是version指令,就列印版本號;之後在下面定義了上面的各種方法。

func main() {

	kingpin.Version("0.0.1")
	//判斷接受到的引數Args
	switch kingpin.MustParse(app.Parse(os.Args[1:])) {

	// 如果接受到"start" command
	case start.FullCommand():
		logger.Infof("Starting %s", metadata.GetVersionInfo())
		//載入配置
		conf := config.Load()
		//初始化日誌級別
		//生產環境下日誌級別調高
		initializeLoggingLevel(conf)
		//初始化profile,go內建的觀察程式執行的工具
		//可以通過http呼叫
		initializeProfilingService(conf)
		//初始化grpc服務端
		grpcServer := initializeGrpcServer(conf)
		//載入msp簽名證書
		initializeLocalMsp(conf)
		//msp證書給簽名者例項化
		signer := localmsp.NewSigner()
		//初始化鏈的管理者(也就是主節點)
		manager := initializeMultiChainManager(conf, signer)
		//例項化服務
		server := NewServer(manager, signer)
		//繫結服務
		ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
		logger.Info("Beginning to serve requests")
		//啟動服務
		grpcServer.Start()
	// 如果接受到"version" command
	case version.FullCommand():
		//列印版本號
		fmt.Println(metadata.GetVersionInfo())
	}

}

我們來看下初始化管理者的程式碼

func initializeMultiChainManager(conf *config.TopLevel, signer crypto.LocalSigner) multichain.Manager {
	//建立賬本工廠,產生臨時區塊
	lf, _ := createLedgerFactory(conf)
	//判斷鏈是否存在
	if len(lf.ChainIDs()) == 0 {
		//鏈不存在
		//啟動引導鏈
		initializeBootstrapChannel(conf, lf)
	} else {
		logger.Info("Not bootstrapping because of existing chains")
	}
	//例項化共識機制
	//有solo和kafka兩種模式
	consenters := make(map[string]multichain.Consenter)
	consenters["solo"] = solo.New()
	consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version)

	return multichain.NewManagerImpl(lf, consenters, signer)
}
  • orderer的配置檔案在orderer.yaml中,監聽的地址是127.0.0.1;監聽的埠是7050;BCCSP是密碼學,賬本儲存最終還是儲存到硬碟中。
  • 接下來咱們看下例項化服務server.go,這裡面定義了交易收集和廣播區塊。
type server struct {
	//交易收集
	bh broadcast.Handler
	//廣播區塊
	dh deliver.Handler
}

我們具體看下交易收集:broadcast.go

func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
	logger.Debugf("Starting new broadcast loop")
	for {
		//接收交易
		msg, err := srv.Recv()
		if err == io.EOF {
			logger.Debugf("Received EOF, hangup")
			return nil
		}
		if err != nil {
			logger.Warningf("Error reading from stream: %s", err)
			return err
		}

		payload, err := utils.UnmarshalPayload(msg.Payload)
		if err != nil {
			logger.Warningf("Received malformed message, dropping connection: %s", err)
			return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
		}

		//驗證訊息體的內容,有錯誤則返回Status_BAD_REQUEST
		if payload.Header == nil {
			logger.Warningf("Received malformed message, with missing header, dropping connection")
			return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
		}

		chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
		if err != nil {
			logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err)
			return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
		}

		if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
			logger.Debugf("Preprocessing CONFIG_UPDATE")
			msg, err = bh.sm.Process(msg)
			if err != nil {
				logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err)
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
			}

			err = proto.Unmarshal(msg.Payload, payload)
			if err != nil || payload.Header == nil {
				logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
			}

			chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
			if err != nil {
				logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header): %s", err)
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
			}

			if chdr.ChannelId == "" {
				logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
			}
		}

		//獲取support物件
		support, ok := bh.sm.GetChain(chdr.ChannelId)
		if !ok {
			logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
			return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
		}

		logger.Debugf("[channel: %s] Broadcast is filtering message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])

		//將訊息傳到support的過濾器中過濾
		//是區塊的第一次過濾,第二次是在區塊切割時過濾的
		_, filterErr := support.Filters().Apply(msg)

		if filterErr != nil {
			logger.Warningf("[channel: %s] Rejecting broadcast message because of filter error: %s", chdr.ChannelId, filterErr)
			return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
		}
		//訊息入列,然後被solo或kafka處理
		if !support.Enqueue(msg) {
			return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
		}

		if logger.IsEnabledFor(logging.DEBUG) {
			logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])
		}
		//返回正確的200碼
		err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
		if err != nil {
			logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
			return err
		}
	}
}

我們具體看下廣播區塊:deliver.go

func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
	logger.Debugf("Starting new deliver loop")
	for {
		logger.Debugf("Attempting to read seek info message")
		//接收請求
		envelope, err := srv.Recv()
		if err == io.EOF {
			logger.Debugf("Received EOF, hangup")
			return nil
		}

		if err != nil {
			logger.Warningf("Error reading from stream: %s", err)
			return err
		}
		//做校驗
		payload, err := utils.UnmarshalPayload(envelope.Payload)
		if err != nil {
			logger.Warningf("Received an envelope with no payload: %s", err)
			return sendStatusReply(srv, cb.Status_BAD_REQUEST)
		}

		if payload.Header == nil {
			logger.Warningf("Malformed envelope received with bad header")
			return sendStatusReply(srv, cb.Status_BAD_REQUEST)
		}

		chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
		if err != nil {
			logger.Warningf("Failed to unmarshal channel header: %s", err)
			return sendStatusReply(srv, cb.Status_BAD_REQUEST)
		}
		//獲取chain物件
		chain, ok := ds.sm.GetChain(chdr.ChannelId)
		if !ok {
			// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
			// So we would expect our log to be somewhat flooded with these
			logger.Debugf("Rejecting deliver because channel %s not found", chdr.ChannelId)
			return sendStatusReply(srv, cb.Status_NOT_FOUND)
		}
		//監聽是否有錯誤發生
		//有錯誤,返回503
		erroredChan := chain.Errored()
		select {
		case <-erroredChan:
			logger.Warningf("[channel: %s] Rejecting deliver request because of consenter error", chdr.ChannelId)
			return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
		default:

		}

		lastConfigSequence := chain.Sequence()
		//對鏈配置資訊校驗
		sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
		result, _ := sf.Apply(envelope)
		if result != filter.Forward {
			logger.Warningf("[channel: %s] Received unauthorized deliver request", chdr.ChannelId)
			return sendStatusReply(srv, cb.Status_FORBIDDEN)
		}
		//解析請求訊息內容
		seekInfo := &ab.SeekInfo{}
		if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
			logger.Warningf("[channel: %s] Received a signed deliver request with malformed seekInfo payload: %s", chdr.ChannelId, err)
			return sendStatusReply(srv, cb.Status_BAD_REQUEST)
		}
  • 在server.go中會遇到chain呼叫Manager    Manage.go

type Manager interface {
	//獲取鏈物件
	GetChain(chainID string) (ChainSupport, bool)

	//獲取系統鏈,用於引導其他鏈的生成
	SystemChannelID() string

	//生成或更新鏈的配置
	NewChannelConfig(envConfigUpdate *cb.Envelope) (configtxapi.Manager, error)
}

//配置資源
type configResources struct {
	configtxapi.Manager
}

//獲取orderer相關的配置
//點進Orderer可以看相關配置
func (cr *configResources) SharedConfig() config.Orderer {
	oc, ok := cr.OrdererConfig()
	if !ok {
		logger.Panicf("[channel %s] has no orderer configuration", cr.ChainID())
	}
	return oc
}

//定義賬本資源
type ledgerResources struct {
	//配置資源
	*configResources
	//賬本的讀寫物件
	//對賬本操作的入口
	ledger ledger.ReadWriter
}

//manager的實現類
type multiLedger struct {
	//鏈
	chains          map[string]*chainSupport
	//共識機制
	consenters      map[string]Consenter
	//賬本讀寫工廠
	ledgerFactory   ledger.Factory
	//簽名物件
	signer          crypto.LocalSigner
	//系統鏈的標識
	systemChannelID string
	//定義系統鏈
	systemChannel   *chainSupport
}

//獲取某條鏈更新的配置交易
func getConfigTx(reader ledger.Reader) *cb.Envelope {
	//獲取鏈上最新的一個區塊
	lastBlock := ledger.GetBlock(reader, reader.Height()-1)
	//根據最新的區塊資訊,可以找到最新的配置交易的區塊
	index, err := utils.GetLastConfigIndexFromBlock(lastBlock)
	if err != nil {
		logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
	}
	//讀取配置區塊
	configBlock := ledger.GetBlock(reader, index)
	if configBlock == nil {
		logger.Panicf("Config block does not exist")
	}
	//讀取最新的配置交易
	return utils.ExtractEnvelopeOrPanic(configBlock, 0)
}

//manager的例項化
func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consenter, signer crypto.LocalSigner) Manager {
	//接收傳來的引數
	//直接賦值,上面定義的
	ml := &multiLedger{
		chains:        make(map[string]*chainSupport),
		ledgerFactory: ledgerFactory,
		consenters:    consenters,
		signer:        signer,
	}

	//讀取本地儲存的鏈的ID
	existingChains := ledgerFactory.ChainIDs()
	//迴圈
	for _, chainID := range existingChains {
		//根據賬本工廠例項化賬本讀的物件
		//rl:read ledger
		rl, err := ledgerFactory.GetOrCreate(chainID)
		if err != nil {
			logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
		}
		//獲取最新的配置交易
		configTx := getConfigTx(rl)
		if configTx == nil {
			logger.Panic("Programming error, configTx should never be nil here")
		}
		//將配置交易和ledger物件繫結
		ledgerResources := ml.newLedgerResources(configTx)
		chainID := ledgerResources.ChainID()

		//讀取鏈是否有聯盟配置
		//聯盟配置:是否有建立其他鏈的許可權
		//一般只有系統鏈有聯盟配置
		if _, ok := ledgerResources.ConsortiumsConfig(); ok {
			//有聯盟配置
			if ml.systemChannelID != "" {
				//已經存在系統鏈,報錯
				logger.Panicf("There appear to be two system chains %s and %s", ml.systemChannelID, chainID)
			}
			//例項化ChainSupport,依次賦值
			chain := newChainSupport(createSystemChainFilters(ml, ledgerResources),
				ledgerResources,
				consenters,
				signer)
			logger.Infof("Starting with system channel %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
			ml.chains[chainID] = chain
			ml.systemChannelID = chainID
			ml.systemChannel = chain
			// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
			//延遲啟動
			//其他鏈完成後,啟動系統鏈
			defer chain.start()
		} else {
			logger.Debugf("Starting chain: %s", chainID)
			chain := newChainSupport(createStandardFilters(ledgerResources),
				ledgerResources,
				consenters,
				signer)
			//建立標準鏈
			ml.chains[chainID] = chain
			//啟動
			chain.start()
		}

	}
	//系統鏈不存在,則報錯
	if ml.systemChannelID == "" {
		logger.Panicf("No system chain found.  If bootstrapping, does your system channel contain a consortiums group definition?")
	}
	//返回ml
	return ml
}

//返回系統鏈id
func (ml *multiLedger) SystemChannelID() string {
	return ml.systemChannelID
}

// GetChain retrieves the chain support for a chain (and whether it exists)
//得到鏈
func (ml *multiLedger) GetChain(chainID string) (ChainSupport, bool) {
	cs, ok := ml.chains[chainID]
	return cs, ok
}

//例項化一個賬本資源物件
func (ml *multiLedger) newLedgerResources(configTx *cb.Envelope) *ledgerResources {
	//初始化配置交易
	initializer := configtx.NewInitializer()
	//生成配置manager
	configManager, err := configtx.NewManagerImpl(configTx, initializer, nil)
	if err != nil {
		logger.Panicf("Error creating configtx manager and handlers: %s", err)
	}
	//得到chainID
	chainID := configManager.ChainID()
	//根據chainID,例項化賬本物件
	ledger, err := ml.ledgerFactory.GetOrCreate(chainID)
	if err != nil {
		logger.Panicf("Error getting ledger for %s", chainID)
	}
	//最終返回賦值後的賬本資源物件
	return &ledgerResources{
		configResources: &configResources{Manager: configManager},
		ledger:          ledger,
	}
}

//生成一條新鏈
func (ml *multiLedger) newChain(configtx *cb.Envelope) {
	//建立賬本資源物件
	ledgerResources := ml.newLedgerResources(configtx)
	//組裝區塊,通過Append加到鏈上
	ledgerResources.ledger.Append(ledger.CreateNextBlock(ledgerResources.ledger, []*cb.Envelope{configtx}))

	// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
	//得到新的鏈,可以加鎖
	newChains := make(map[string]*chainSupport)
	for key, value := range ml.chains {
		newChains[key] = value
	}

	cs := newChainSupport(createStandardFilters(ledgerResources), ledgerResources, ml.consenters, ml.signer)
	chainID := ledgerResources.ChainID()

	logger.Infof("Created and starting new chain %s", chainID)

	newChains[string(chainID)] = cs
	cs.start()

	ml.chains = newChains
}

func (ml *multiLedger) channelsCount() int {
	return len(ml.chains)
}

//生成新的鏈的配置
func (ml *multiLedger) NewChannelConfig(envConfigUpdate *cb.Envelope) (configtxapi.Manager, error) {

	//下面是生成新鏈前,做各種校驗
	configUpdatePayload, err := utils.UnmarshalPayload(envConfigUpdate.Payload)
	if err != nil {
		return nil, fmt.Errorf("Failing initial channel config creation because of payload unmarshaling error: %s", err)
	}
  • 接下來看chainsupport.go

//定義共識機制的介面
type ConsenterSupport interface {
	//本地簽名
	crypto.LocalSigner
	//區塊切割物件
	BlockCutter() blockcutter.Receiver
	//配置
	SharedConfig() config.Orderer
	//切割好的交易打包成區塊
	CreateNextBlock(messages []*cb.Envelope) *cb.Block
	//將區塊寫入
	WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block
	//獲取鏈的ID
	ChainID() string 
	//獲取鏈當前的區塊高度
	Height() uint64  // Returns the number of blocks on the chain this specific consenter instance is associated with
}

type ChainSupport interface {
	//背書策略
	PolicyManager() policies.Manager

	//讀取賬本的介面
	Reader() ledger.Reader

	//處理賬本的錯誤
	Errored() <-chan struct{}

	//處理交易輸入的介面
	broadcast.Support
	//定義共識機制的介面
	ConsenterSupport

	//序列
	//每次對鏈進行修改,Sequence是加1的
	Sequence() uint64

	//將一個交易轉為配置交易
	//Envelope:可以理解為交易
	ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error)
}

type chainSupport struct {
	//鏈的資源資訊,鏈的配置和賬本讀寫物件
	*ledgerResources
	//鏈
	chain         Chain
	//區塊切割
	cutter        blockcutter.Receiver
	//過濾器
	//orderer過濾一些交易為空的資料
	filters       *filter.RuleSet
	//簽名
	signer        crypto.LocalSigner
	//最新配置資訊所在的區塊高度
	lastConfig    uint64
	//最新配置資訊所在的序列化
	lastConfigSeq uint64
}

func newChainSupport(
	filters *filter.RuleSet,
	ledgerResources *ledgerResources,
	consenters map[string]Consenter,
	signer crypto.LocalSigner,
) *chainSupport {
	//建立區塊切割物件
	cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
	//根據配置查詢orderer使用的共識機制
	consenterType := ledgerResources.SharedConfig().ConsensusType()
	//得到共識機制
	consenter, ok := consenters[consenterType]
	if !ok {
		logger.Fatalf("Error retrieving consenter of type: %s", consenterType)
	}

	//賦值
	cs := &chainSupport{
		ledgerResources: ledgerResources,
		cutter:          cutter,
		filters:         filters,
		signer:          signer,
	}
	//序列號
	cs.lastConfigSeq = cs.Sequence()

	var err error
	//最新區塊
	lastBlock := ledger.GetBlock(cs.Reader(), cs.Reader().Height()-1)

	if lastBlock.Header.Number != 0 {
		//獲取最新配置資訊所在的區塊高度
		cs.lastConfig, err = utils.GetLastConfigIndexFromBlock(lastBlock)
		if err != nil {
			logger.Fatalf("[channel: %s] Error extracting last config block from block metadata: %s", cs.ChainID(), err)
		}
	}
	//獲取區塊元資料資訊
	metadata, err := utils.GetMetadataFromBlock(lastBlock, cb.BlockMetadataIndex_ORDERER)
	if err != nil {
		logger.Fatalf("[channel: %s] Error extracting orderer metadata: %s", cs.ChainID(), err)
	}
	logger.Debugf("[channel: %s] Retrieved metadata for tip of chain (blockNumber=%d, lastConfig=%d, lastConfigSeq=%d): %+v", cs.ChainID(), lastBlock.Header.Number, cs.lastConfig, cs.lastConfigSeq, metadata)
	//用共識機制操作Chain
	cs.chain, err = consenter.HandleChain(cs, metadata)
	if err != nil {
		logger.Fatalf("[channel: %s] Error creating consenter: %s", cs.ChainID(), err)
	}

	return cs
}

//例項化過濾器
func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet {
	return filter.NewRuleSet([]filter.Rule{
		filter.EmptyRejectRule,
		sizefilter.MaxBytesRule(ledgerResources.SharedConfig()),
		sigfilter.New(policies.ChannelWriters, ledgerResources.PolicyManager()),
		configtxfilter.NewFilter(ledgerResources),
		filter.AcceptRule,
	})

}
func (cs *chainSupport) WriteBlock(block *cb.Block, committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
	//遍歷所有提交的交易
	for _, committer := range committers {
		committer.Commit()
	}
	// Set the orderer-related metadata field
	//判斷元資料
	if encodedMetadataValue != nil {
		block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
	}
	//進行區塊簽名
	cs.addBlockSignature(block)
	//配置簽名
	cs.addLastConfigSignature(block)

	//將區塊寫入賬本中
	err := cs.ledger.Append(block)
	if err != nil {
		logger.Panicf("[channel: %s] Could not append block: %s", cs.ChainID(), err)
	}
	logger.Debugf("[channel: %s] Wrote block %d", cs.ChainID(), block.GetHeader().Number)

	return block
}
  • 點Receiver進去,看區塊切割,在blockcutter.go中:

func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, committerBatches [][]filter.Committer, validTx bool, pending bool) {
	//將交易資訊再次過濾
	//第一次過濾是orderer接收到交易請求時
	committer, err := r.filters.Apply(msg)
	if err != nil {
		logger.Debugf("Rejecting message: %s", err)
		return // We don't bother to determine `pending` here as it's not processed in error case
	}

	// message is valid
	//將交易標記為有效
	validTx = true

	//計算交易體的大小
	messageSizeBytes := messageSizeBytes(msg)

	//判斷是否交易隔離,配置交易進行隔離
	//交易體的大小,如果比最大交易體大小大,認為交易內容過大,進行單獨切塊
	if committer.Isolated() || messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes {

		if committer.Isolated() {
			logger.Debugf("Found message which requested to be isolated, cutting into its own batch")
		} else {
			logger.Debugf("The current message, with %v bytes, is larger than the preferred batch size of %v bytes and will be isolated.", messageSizeBytes, r.sharedConfigManager.BatchSize().PreferredMaxBytes)
		}

		//若存在每被七個的交易
		//將未被切割的交易存放到區塊
		if len(r.pendingBatch) > 0 {
			messageBatch, committerBatch := r.Cut()
			messageBatches = append(messageBatches, messageBatch)
			committerBatches = append(committerBatches, committerBatch)
		}

		//單獨切割當前交易
		messageBatches = append(messageBatches, []*cb.Envelope{msg})
		committerBatches = append(committerBatches, []filter.Committer{committer})

		return
	}
	//不隔離的交易,這裡處理
	//判斷加上當前交易後,區塊大小是否超出預先設定的大小
	messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes

	//如果超出了預定大小,進入if
	if messageWillOverflowBatchSizeBytes {
		logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
		logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
		//進行切割
		messageBatch, committerBatch := r.Cut()
		messageBatches = append(messageBatches, messageBatch)
		committerBatches = append(committerBatches, committerBatch)
	}

	logger.Debugf("Enqueuing message into batch")
	r.pendingBatch = append(r.pendingBatch, msg)
	r.pendingBatchSizeBytes += messageSizeBytes
	r.pendingCommitters = append(r.pendingCommitters, committer)
	pending = true

	//若區塊佇列超出閾值範圍,進行切割
	if uint32(len(r.pendingBatch)) >= r.sharedConfigManager.BatchSize().MaxMessageCount {
		logger.Debugf("Batch size met, cutting batch")
		messageBatch, committerBatch := r.Cut()
		messageBatches = append(messageBatches, messageBatch)
		committerBatches = append(committerBatches, committerBatch)
		pending = false
	}

	return
}

//完成切割這個動作
func (r *receiver) Cut() ([]*cb.Envelope, []filter.Committer) {
	batch := r.pendingBatch
	r.pendingBatch = nil
	committers := r.pendingCommitters
	r.pendingCommitters = nil
	r.pendingBatchSizeBytes = 0
	return batch, committers
}

func messageSizeBytes(message *cb.Envelope) uint32 {
	//將訊息體和簽名加起來求長度
	return uint32(len(message.Payload) + len(message.Signature))
}
  • 在consensus.go中是共識:
func (ch *chain) main() {
	//定義定時器
	var timer <-chan time.Time
	//迴圈
	for {
		select {
		//不停的從交易的channel中獲取交易
		//將獲取到的交易,傳送給區塊切割物件
		//返回需要切割的區塊
		case msg := <-ch.sendChan:
			//區塊切割
			batches, committers, ok, _ := ch.support.BlockCutter().Ordered(msg)
			//判斷交易是否有效
			//判斷定時器是否未空
			if ok && len(batches) == 0 && timer == nil {
				//例項化定時器
				timer = time.After(ch.support.SharedConfig().BatchTimeout())
				continue
			}
			//建立區塊
			//最終儲存到orderer節點的臨時賬本中
			for i, batch := range batches {
				block := ch.support.CreateNextBlock(batch)
				ch.support.WriteBlock(block, committers[i], nil)
			}
			//判斷交易的有效性
			if len(batches) > 0 {
				//定時器重新計時
				timer = nil
			}
			//定時器觸發
			//馬上進行區塊切割
		case <-timer:
			//clear the timer
			timer = nil
			//區塊切割
			batch, committers := ch.support.BlockCutter().Cut()
			if len(batch) == 0 {
				logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
				continue
			}
			logger.Debugf("Batch timer expired, creating block")
			//建立區塊
			block := ch.support.CreateNextBlock(batch)
			//寫區塊
			ch.support.WriteBlock(block, committers, nil)
		case <-ch.exitChan:
			logger.Debugf("Exiting")
		//直接退出
			return
		}
	}
}