1. 程式人生 > >Fabric原始碼分析-共識模組

Fabric原始碼分析-共識模組

正好這些天要有一個需求要幫客戶魔改Fabric-v0.6,把一些hyperchain的高階特性移植過去,藉此機會把之前看過的原始碼在梳理一下。
下面就是對Fabric共識模組的原始碼分析和梳理,程式碼都是以Fabric-v0.6-preview為例,在1.0及後續版本中都移除了PBFT部分,用了更好的SBFT,目前這一部分還在開發中。

目錄結構

可以看到共識模組目錄如下。

consensus
├── controller
├── executor
├── helper
│   └── persist
├── noops
├── pbft
└── util
    └── events

目錄含義如下

  • controller
    用來控制Fabric選擇什麼樣的共識演算法,預設是noops
  • executor 封裝了訊息佇列中對交易的處理。
  • helper 對外提供介面呼叫和資料持久化介面。
  • noops 提供瞭如何編寫Fabric共識演算法的Demo。
  • pbft PBFT演算法的具體實現。
  • util 實現了一個peer節點到共識演算法的一個訊息通道,和一個訊息佇列。

流程概覽

Fabric網路通過一個EventLoop和共識演算法進行互動,所有的操作都通過對事件迴圈中的事件監聽進行推進。

整體流程如下圖所示。

Consensus模組介面

fabric/consensus/consensus.go對外提供共識模組的方法呼叫。

其中最核心也是每個演算法必須實現的介面是Consenter

type ExecutionConsumer interface {
    Executed(tag interface{})                                
    Committed(tag interface{}, target *pb.BlockchainInfo)    
    RolledBack(tag interface{})                              
    StateUpdated(tag interface{}, target *pb.BlockchainInfo)
}

type Consenter
interface { RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error ExecutionConsumer }

介面的具體實現在fabric/consensus/pbft/external.go

因為對交易的操作都是非同步的,所以必須手動實現ExecutedCommittedRolledBackStateUpdated方法來監聽對應動作的完成。

RecvMsg方法用來從不用的peer節點接收訊息。

初始化共識模組

共識演算法引擎在peer啟動的時候初始化,初始化的具體函式如下所示。

// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
    var err error
    engineOnce.Do(func() {
        engine = new(EngineImpl)
        engine.helper = NewHelper(coord)
        engine.consenter = controller.NewConsenter(engine.helper)
        engine.helper.setConsenter(engine.consenter)
        engine.peerEndpoint, err = coord.GetPeerEndpoint()
        engine.consensusFan = util.NewMessageFan()

        go func() {
            logger.Debug("Starting up message thread for consenter")

            for msg := range engine.consensusFan.GetOutChannel() {
                engine.consenter.RecvMsg(msg.Msg, msg.Sender)
            }
        }()
    })
    return engine, err
}

GetEngine的作用是進行共識模組的初始化,同時啟動一個goroutine等待訊息進入。

具體的engine.consenter是在consensus/controller/controller.go裡選擇。

// consensus/controller/controller.go
func NewConsenter(stack consensus.Stack) consensus.Consenter {

    plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin"))
    if plugin == "pbft" {
        logger.Infof("Creating consensus plugin %s", plugin)
        return pbft.GetPlugin(stack)
    }
    logger.Info("Creating default consensus plugin (noops)")
    return noops.GetNoops(stack)

}

預設選擇的是noops,如果需要新增自己編寫的共識模組需要在這裡自行新增判斷。

noops 只是演示如何編寫Fabric共識模組,不要用在生產環境。

如果選擇了PBFT則會呼叫consensus/pbft/pbft.go進行初始化。

使用PBFTbatch模式啟動時會呼叫newObcBatch進行PBFT演算法初始化。

PBFT只有batch一種模式。
// consensus/pbft/batch.go
func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {
    var err error

    ...

    op.manager = events.NewManagerImpl()    
    op.manager.SetReceiver(op)
    etf := events.NewTimerFactoryImpl(op.manager)
    op.pbft = newPbftCore(id, config, op, etf)
    op.manager.Start()
    blockchainInfoBlob := stack.GetBlockchainInfoBlob()
    op.externalEventReceiver.manager = op.manager

    ...

    return op
}

newObcBatch主要做了這幾項工作

  • 初始化了eventLoop的訊息佇列。
  • 設定了訊息的接收者,用來處理對應的訊息。
  • 建立監聽訊息超時的定時器。
  • 初始化pbft演算法。
  • 啟動訊息佇列,不斷監聽事件的到來並且分發給接收者處理。

訊息處理

Fabric的共識訊息是通過eventLoop注射給對應處理函式的。

// consensus/util/events/events.go
func SendEvent(receiver Receiver, event Event) {
    next := event
    for {
        next = receiver.ProcessEvent(next)
        if next == nil {
            break
        }
    }
}

func (em *managerImpl) Inject(event Event) {
    if em.receiver != nil {
        SendEvent(em.receiver, event)
    }
}

func (em *managerImpl) eventLoop() {
    for {
        select {
        case next := <-em.events:
            em.Inject(next)
        case <-em.exit:
            logger.Debug("eventLoop told to exit")
            return
        }
    }
}

eventLoop函式不斷的從em.events裡取出事件,通過Inject注射給對應的接收者,注意,通過SendEvent注射給接收者的ProcessEvent方法。

SendEvent函式實現非常有意思,如果receiver.ProcessEvent的返回不為nil則不斷的呼叫receiver.ProcessEvent直到找到對應的訊息處理函式,在ProcessEvent函式中,其餘case均為事件處理函式,唯獨pbftMessage依賴SendEvent傳送訊息給其餘函式處理。

// consensus/pbft/pbft-core.go
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
    ...

    case *pbftMessage:
        return pbftMessageEvent(*et)
    case pbftMessageEvent:
        msg := et
        logger.Debugf("Replica %d received incoming message from %v", instance.id, msg.sender)
        next, err := instance.recvMsg(msg.msg, msg.sender)
        if err != nil {
            break
        }
        return next
    case *RequestBatch:
        err = instance.recvRequestBatch(et)
    case *PrePrepare:
        err = instance.recvPrePrepare(et)

    ...
}

可以看到*pbftMessagepbftMessageEvent這兩個case通過recvMsg的返回值又把訊息分發給其餘case,非常巧妙。

PBFT演算法的不同階段都會按著上面的流程對映到不同的處理函式往前推進,本質上是一個狀態機。

至此Fabric的Consensus模組主要流程已經梳理清楚,熟悉了這個流程以後再結合PBFT演算法的過程就可以很容易在此基礎上新增新的功能了。

https://zhuanlan.zhihu.com/p/35255567