1. 程式人生 > >07.Fabric原始碼解析——peer的ChaincodeSupport服務

07.Fabric原始碼解析——peer的ChaincodeSupport服務

Fabric原始碼解析7——peer的ChaincodeSupport服務

Fabirc原始碼解析6中講述了peer結點如何建立和註冊grpc服務,接下來的幾篇文章將對peer註冊的各個服務進行詳述。該篇講述ChaincodeSupport服務,ChaincodeSupport服務為每個peer提供了chaincode操作的支援。registerChaincodeSupport(peerServer.Server())一句,位於/fabric/peer/node/start.go檔案中的serve函式中,給peerServer註冊了ChaincodeSupport服務。

ChaincodeSupport的服務原型

ChaincodeSupport的服務原型和生成的go定義在/fabric/protos/peer/下的chaincode_shim.proto和chaincode_shim.pb.go中,核心的實現程式碼在/fabric/core/chaincode/chaincode_support.go中。主要的定義的是一個rpc Register(stream ChaincodeMessage) returns (stream ChaincodeMessage){}服務。該服務實現客戶端和伺服器端ChaincodeMessage型別流資料的交換。用於服務端流資料交換的grpc流服務介面象為/fabric/protos/peer/chaincode_shim.pb.go中的ChaincodeSupport_RegisterServer,在/fabric/core/container/ccintf/ccintf.go中有對應用於容器內部間的流介面ChaincodeStream。

ChaincodeSupport的服務是一個全域性單例,該單例物件定義在chaincode_support.go中,var theChaincodeSupport *ChaincodeSupport。ChaincodeSupport物件自身儲存一系列配置值,而接收和處理客戶端ChaincodeMessage型別訊息的任務其實是委託給了一個個Handler物件。

//生成的收發的資料型別

type ChaincodeMessage struct {
    Type      ChaincodeMessage_Type      
    Timestamp *google_protobuf1.Timestamp
    Payload   []byte                     
    Txid      string                     
    Proposal  *SignedProposal            
    ChaincodeEvent *ChaincodeEvent
}

//proto中ChaincodeSupport服務原型

service ChaincodeSupport {
    rpc Register(stream ChaincodeMessage) returns (stream ChaincodeMessage) {}
}

//生成的服務端流介面

type ChaincodeSupport_RegisterServer interface {
    Send(*ChaincodeMessage) error
    Recv() (*ChaincodeMessage, error)
    grpc.ClientStream
}

FSM

FSM是finite state machine的縮寫,有限狀態機,是ChaincodeSupport服務使用到的一個第三方庫,在github.com/looplab/fsm可以下載。FSM將一個事物從狀態A向狀態B的轉化看作一個事件,並可以設定在進入/離開某個狀態時自動呼叫的時機函式。每個狀態事件、狀態、時機函式都用字串關鍵字表示。在此簡單介紹一下用法:

//建立一個狀態機
//三個引數:1.預設狀態 2.定義狀態事件 3.定義狀態轉變時呼叫的函式

fsm := fsm.NewFSM(
    "green",
    fsm.Events{
        //狀態事件的名稱   該事件的起始狀態Src         該事件的結束狀態Dst
        //即:狀態事件warn(警告事件)表示事物的狀態從狀態green到狀態yellow
        {Name: "warn",  Src: []string{"green"},  Dst: "yellow"},
        {Name: "panic", Src: []string{"yellow"}, Dst: "red"},
        {Name: "calm",  Src: []string{"red"},    Dst: "yellow"},
    },
//狀態事件呼叫函式,在此稱為 時機函式。關鍵字用'_'隔開,格式是:"呼叫時機_事件或狀態"
//before表示在該事件或狀態發生之前呼叫該函式,如"before_warn",表示在warn
//這個狀態事件發生前呼叫這個函式。"before_yellow"表示進入yellow狀態之前呼叫
//該函式。
//依此類推,after表示在...之後,enter表示在進入...之時,leave表示在離開...
//之時。
fsm.Callbacks{
    //fsm內定義的狀態事件函式,關鍵字指定的是XXX_event和XXX_state
    //表示任一的狀態或狀態事件
    "before_event": func(e *fsm.Event) {
    fmt.Println("before_event")
    },
    "leave_state": func(e *fsm.Event) {
    fmt.Println("leave_state")
    },
    //根據自定義狀態或事件所定義的狀態事件函式
    "before_yellow": func(e *fsm.Event) {
    fmt.Println("before_yellow")
    },
    "before_warn": func(e *fsm.Event) {
    fmt.Println("before_warn")
    },
},

)
//列印當前狀態,輸出是預設狀態green

fmt.Println(fsm.Current())

//觸發warn狀態事件,狀態將會從green轉變到yellow
//同時觸發"before_warn"、“before_yellow”、“before_event”、"leave_state"函式

fsm.Event("warn")

//列印當前狀態,輸出狀態是yellow

fmt.Println(fsm.Current())

任何專案中,服務是以所能提供的操作為中心的,ChaincodeSupport服務的操作(即可被外部呼叫的函式)有Launch,Register,Execute,HandleChaincodeStream,Stop。

Register

追溯ChaincodeSupport物件掛載的Register函式,最終呼叫的是/fabric/core/chaincode/handler.go中的HandleChaincodeStream函式。在HandleChaincodeStream函式中:

handler := newChaincodeSupportHandler(chaincodeSupport, stream)
handler.processStream()

建立了一個Handler,然後呼叫Handler的processStream函式對客戶端傳送的流資料進行了處理。這兩個函式都在同文件中實現。newChaincodeSupportHandler函式所傳入的兩個引數值得注意,一個是chaincodeSupport,一個是stream。前者是Register服務所在的ChaincodeSupport物件自身,賦值給了Hanlder物件成員chaincodeSupport,為的是讓Handler物件處理接收資料時能夠使用ChaincodeSupport物件的服務;後者是Register服務的grpc流介面,賦值給了Handler物件成員ChatStream,為的是Handler能夠從客戶端接收到資料。後文還會提到這點。

Handler

newChaincodeSupportHandler建立並初始化了一個Handler,初始化的成員有:

  1. ChatStream - grpc流服務介面,是用Register函式傳進來的。
  2. chaincodeSupport - chaincodeSupport自身。
  3. nextState - 狀態通道。
  4. FSM - 狀態機,參看上文。
  5. policyChecker - 策略檢查器,將在對應主題文章中詳述。

processStream

processStream用recv標識、| errc | msgAvil | nextState | keepalivetimer |四個頻道、select三者相互配合,形成了對客戶端訊息的接收控制。然後呼叫HandleMessage、serialSend、serialSendAsync處理接收到的訊息。

  1. errc - 錯誤頻道
  2. msgAvil - ChaincodeMessage頻道
  3. nextState - 包含ChaincodeMessage的頻道
  4. keepalivetime - 心跳頻道

HandleMessage

HandleMessage處理ChaincodeMessage資料的方式完全是由Handler中的狀態機FSM驅動的。在newChaincodeSupportHandler有大段程式碼是初始化其狀態機的:

v.FSM = fsm.NewFSM(createdstate,fsm.Events{...},fsm.Callbacks{...})

狀態機FSM所註冊的狀態事件有

///fabric/protos/peer/chaincode_shim.pb.go中定義
//REGISTER即pb.ChaincodeMessage_REGISTER.String()對應的字串值,下同
//REGISTER事件表示從狀態createdstate到狀態establishedstate,下略。
REGISTER Src: []string{createdstate}, Dst: establishedstate

READY
PUT_STATE
DEL_STATE
INVOKE_CHAI
COMPLETED
GET_STATE
GET_STATE_B
GET_QUERY_R
GET_HISTORY
QUERY_STATE
QUERY_STATE
ERROR
RESPONSE
INIT
TRANSACTION
RESPONSE
INIT
TRANSACTION

狀態機FSM所涉及的事件狀態有

//在/fabric/core/chaincode/handler.go中以常量的形式定義

createdstate     = "created"
establishedstate = "established"
readystate       = "ready"
endstate         = "end"

狀態機FSM狀態事件所呼叫的時機函式為

//在REGISTER事件發生之前呼叫beforeRegisterEvent,下同。

"before_REGISTER"           : beforeRegisterEvent
"before_COMPLETED"          : beforeCompletedEvent
"after_GET_STATE"           : afterGetState
"after_GET_STATE_BY_RANGE"  : afterGetStateByRange
"after_GET_QUERY_RESULT"    : afterGetQueryResult
"after_GET_HISTORY_FOR_KEY" : afterGetHistoryForKey
"after_QUERY_STATE_NEXT"    : afterQueryStateNext
"after_QUERY_STATE_CLOSE"   : afterQueryStateClose
"after_PUT_STATE"           : enterBusyState
"after_DEL_STATE"           : enterBusyState
"after_INVOKE_CHAINCODE"    : enterBusyState
//表示在進入established狀態之時呼叫enterEstablishedState,下同。
"enter_established"         : enterEstablishedState
"enter_ready"               : enterReadyState
"enter_end"                 : enterEndState

在HandleMessage函式中,對傳入的資料msg簡單驗證後,eventErr := handler.FSM.Event(msg.Type.String(), msg)觸發了狀態機的狀態事件,進而觸發了對應的時機函式。以“REGISTER型別的ChaincodeMessage”為例。客戶端通過grpc傳送REGISTER型別的ChaincodeMessage資訊,服務端通過msgAvil頻道接收後傳入HandlerMessage函式,狀態機對應執行REGISTER狀態事件,從狀態createdstate向狀態establishedstate轉變,同時在轉變之前自動觸發beforeRegisterEvent時機函式完成註冊。當狀態進入establishedstate時,又接著觸發了“enter_established”所對應的enterEstablishedState時機函式去通知客戶端註冊已經正確完成。

在beforeRegisterEvent函式中,err = handler.chaincodeSupport.registerHandler(handler)完成了註冊,使用的是前文所提到的在建立Handler時傳入進來的ChaincodeSupport物件的registerHandler函式。所謂的註冊,也不過是將Handler物件賦值給ChaincodeSupport物件中的runningChaincodes中的chaincodeMap對映:chainID作key,以Handler物件為成員handler值的chaincodeRTEnv物件作value。

serialSend或serialSendAsync

都是使用Handler中grpc服務端流介面ChatStream成員傳送ChaincodeMessage訊息的函式,兩者都將應答ChaincodeMessage資訊傳送給客戶端,也都實現了將所傳送的ChaincodeMessage資訊序列化的目的。區別在於serialSend是阻塞傳送,而serialSendAsync是利用新啟goroutine進行非阻塞傳送,且這些非阻塞的goroutine中任何一個發生傳送訊息的錯誤,都會利用errc頻道將錯誤傳送給processStream函式。

小結

不同型別的ChaincodeMessage的訊息,能夠觸發狀態機不同的狀態事件,處理資料,完成Chaincode上的操作。有關其他型別事件以及具體的實現,在此不再贅述。強調一句,ChaincodeSupport服務是以狀態機驅動的為chaincode提供支援的一項服務。