1. 程式人生 > >菜鳥學習Fabric原始碼學習 — 背書節點和鏈碼容器互動

菜鳥學習Fabric原始碼學習 — 背書節點和鏈碼容器互動

Fabric 1.4 原始碼分析 背書節點和鏈碼容器互動

本文件主要介紹背書節點和鏈碼容器互動流程,在Endorser背書節點章節中,無論是deploy、upgrade或者呼叫鏈碼,最後都會呼叫ChaincodeSupport.LaunchInit()/Launch()以及ChaincodeSupport.execute()方法。其中Launch()方法啟動鏈碼容器,execute()方法呼叫鏈碼。

1. 準備

ChaincodeSupport.Launch()首先進行判斷,根據peer側該版本鏈碼的Handler是否存在,存在則表示已執行。若不存在,則呼叫lscc鏈碼方法cs.Lifecycle.ChaincodeContainerInfo()獲取啟動鏈碼所需的資料ChaincodeContainerInfo。再呼叫cs.Launcher.Launch()方法啟動鏈碼。再判斷是否註冊了handler。

func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) {
    cname := chaincodeName + ":" + chaincodeVersion
    if h := cs.HandlerRegistry.Handler(cname); h != nil {
        return h, nil
    }

    ccci, err := cs.Lifecycle.ChaincodeContainerInfo(chaincodeName, qe)
    if err != nil {
        // TODO: There has to be a better way to do this...
        if cs.UserRunsCC {
            chaincodeLogger.Error(
                "You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?",
            )
        }

        return nil, errors.Wrapf(err, "[channel %s] failed to get chaincode container info for %s", chainID, cname)
    }

    if err := cs.Launcher.Launch(ccci); err != nil {
        return nil, errors.Wrapf(err, "[channel %s] could not launch chaincode %s", chainID, cname)
    }

    h := cs.HandlerRegistry.Handler(cname)
    if h == nil {
        return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname)
    }

    return h, nil
}

type ChaincodeContainerInfo struct {
    Name        string
    Version     string
    Path        string
    Type        string
    CodePackage []byte

    // ContainerType is not a great name, but 'DOCKER' and 'SYSTEM' are the valid types
    ContainerType string
}

Launch()主要實現方法在core/chaincode/runtime_launcher.go Launch()方法。在該方法中,會呼叫r.Runtime.Start(ccci, codePackage)啟動鏈碼,在該方法中,首先會呼叫c.LaunchConfig(cname, ccci.Type)生成建立鏈碼所需的引數LaunchConfig(鏈碼型別go/java/nodejs,以及TLS配置),然後構造啟動鏈碼容器請求StartContainerReq。接著呼叫c.Processor.Process(ccci.ContainerType, scr)正式啟動鏈碼容器。操作完成後,通過Launch()裡面的select—case語句阻塞獲取結果,並結束程式執行。

func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error {
    ...
    if !alreadyStarted {
        ...
        go func() {
            if err := r.Runtime.Start(ccci, codePackage); err != nil {
                startFailCh <- errors.WithMessage(err, "error starting container")
                return
            }
            exitCode, err := r.Runtime.Wait(ccci)
            if err != nil {
                launchState.Notify(errors.Wrap(err, "failed to wait on container exit"))
            }
            launchState.Notify(errors.Errorf("container exited with %d", exitCode))
        }()
    }

    var err error
    select {
    case <-launchState.Done():
        err = errors.WithMessage(launchState.Err(), "chaincode registration failed")
    case err = <-startFailCh:
        launchState.Notify(err)
        r.Metrics.LaunchFailures.With("chaincode", cname).Add(1)
    case <-timeoutCh:
        err = errors.Errorf("timeout expired while starting chaincode %s for transaction", cname)
        launchState.Notify(err)
        r.Metrics.LaunchTimeouts.With("chaincode", cname).Add(1)
    }

    ...
    return err
}

經上面可知,在啟動鏈碼容器時會呼叫c.Processor.Process()方法,其中會呼叫req.Do(v)。存在3個實現,分別是StartContainerReq、WaitContainerReq、StopContainerReq。啟動時是呼叫StartContainerReq。

func (si StartContainerReq) Do(v VM) error {
    return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)
}

2. 啟動系統鏈碼

啟動系統鏈碼(程序模式)的話,則v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)的實現是在core/container/inproccontroller/inproccontroller.go start()方法。

func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
    path := ccid.GetName() // name=Name-Version
    // 獲取已註冊的inprocContainer模版
    ipctemplate := vm.registry.getType(path)
    ...
    instName := vm.GetVMName(ccid)
    // 構建chaincode例項ipc
    ipc, err := vm.getInstance(ipctemplate, instName, args, env)

    // 判斷鏈碼是否執行
    if ipc.running {
        return fmt.Errorf(fmt.Sprintf("chaincode running %s", path))
    }

    ipc.running = true

    go func() {
        defer func() {
            if r := recover(); r != nil {
                inprocLogger.Criticalf("caught panic from chaincode  %s", instName)
            }
        }()
        // 程序模式執行鏈碼
        ipc.launchInProc(instName, args, env)
    }()

    return nil
}

在start()方法方法中,首先會獲取ccid的name,然後根據name獲取已註冊的系統鏈碼模版ipctemplate,根據模版及args、env等引數構建系統鏈碼例項ipc,然後再判斷是否運行了系統鏈碼,如果沒有執行,則開啟協程呼叫launchInProc()方法程序模式啟動系統鏈碼。

在launchInProc()中開啟了2個協程,協程一主要執行shimStartInProc()方法,協程二主要執行HandleChaincodeStream()方法。並且新建了2個通道,便於peer側和鏈碼側通訊。

func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error {
    if ipc.ChaincodeSupport == nil {
        inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()")
    }
    // 建立peer側接收鏈碼側傳送通道
    peerRcvCCSend := make(chan *pb.ChaincodeMessage)
    // 建立鏈碼側接收peer側傳送通道
    ccRcvPeerSend := make(chan *pb.ChaincodeMessage)
    var err error
    // 傳遞鏈碼側Handler物件執行狀態的通道
    ccchan := make(chan struct{}, 1)
    // 傳遞peer側Handler物件執行狀態的通道
    ccsupportchan := make(chan struct{}, 1)
    shimStartInProc := _shimStartInProc // shadow to avoid race in test
    go func() {
        defer close(ccchan)
        inprocLogger.Debugf("chaincode started for %s", id)
        if args == nil {
            args = ipc.args
        }
        if env == nil {
            env = ipc.env
        }
        // 啟動系統鏈碼
        err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend)
        if err != nil {
            err = fmt.Errorf("chaincode-support ended with err: %s", err)
            _inprocLoggerErrorf("%s", err)
        }
        inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err)
    }()

    // shadow function to avoid data race
    inprocLoggerErrorf := _inprocLoggerErrorf
    go func() {
        defer close(ccsupportchan)
        inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend)
        inprocLogger.Debugf("chaincode-support started for  %s", id)
        // 啟動peer側Handler處理控制代碼,建立訊息迴圈,處理鏈碼側傳送的訊息
        err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream)
        if err != nil {
            err = fmt.Errorf("chaincode ended with err: %s", err)
            inprocLoggerErrorf("%s", err)
        }
        inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err)
    }()
    // 阻塞等待訊息處理
    select {
    // 鏈碼側退出,關閉peer側接收鏈碼側傳送通道
    case <-ccchan:
        close(peerRcvCCSend)
        inprocLogger.Debugf("chaincode %s quit", id)
    // peer側chaincode support退出
    case <-ccsupportchan:
        close(ccRcvPeerSend)
        inprocLogger.Debugf("chaincode support %s quit", id)
    case <-ipc.stopChan:
        close(ccRcvPeerSend)
        close(peerRcvCCSend)
        inprocLogger.Debugf("chaincode %s stopped", id)
    }
    return err
}
  • 鏈碼側:

shimStartInProc()方法本質上是執行StartInProc()方法,首先遍歷環境變數,獲取CORE_CHAINCODE_ID_NAME,在執行newInProcStream()建立通訊流,本質上只是將鏈碼側和peer側傳送接收的兩個通道繫結。再執行chatWithPeer()方法與peer側互動。chatWithPeer()首先呼叫newChaincodeHandler()建立鏈碼側Handler,然後傳送第一個註冊訊息,然後開啟訊息迴圈進行處理。

// Register on the stream
chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil {
    return errors.WithMessage(err, "error sending chaincode REGISTER")
}
  • peer側:

該協程中,首先newInProcStream()建立通訊流此處和鏈碼側剛剛相反。再呼叫HandleChaincodeStream()方法,首先建立peer側Handle,再呼叫handler.ProcessStream(stream)對通訊流進行處理(裡面也有個迴圈)。

具體互動流程後續介紹。

3. 啟動應用鏈碼

當啟動應用鏈碼(docker容器模式)時,Start()介面實現為core/container/dockercontroller/dockercontroller.go Start()方法。

在Start()方法中,首先呼叫GetVMNameForDocker方法生成映象名networkId-peerid-name-version-Hash(networkId-peerid-name-version),在呼叫GetVMName()方法生成容器名(networkId-peerid-name-version)。在呼叫getClientFnc()獲取docker客戶端,判斷當前是否執行鏈碼容器,執行則停止當前執行的容器。接著呼叫createContainer()建立容器,如果報不存在映象,則構建映象,再建立鏈碼容器。如果需要配置TLS,則呼叫UploadToContainer()方法提交TLS證書檔案。再呼叫StartContainer()正式啟動鏈碼容器。

當鏈碼容器啟動後,會執行shim.start()方法。首先會獲取通訊流與peer側通訊。再呼叫chatWithPeer()方法。此處介紹獲取通訊流方法。

func userChaincodeStreamGetter(name string) (PeerChaincodeStream, error) {
    flag.StringVar(&peerAddress, "peer.address", "", "peer address")
    ... 
    // Establish connection with validating peer
    // 與peer建立連線
    clientConn, err := newPeerClientConnection()
    ...
    // 建立鏈碼支援服務客戶端
    chaincodeSupportClient := pb.NewChaincodeSupportClient(clientConn)
    ...
    // Establish stream with validating peer
    // 呼叫Register()介面獲取通訊流
    stream, err := chaincodeSupportClient.Register(context.Background())
    return stream, nil
}

當執行chaincodeSupportClient.Register()方法時peer側會執行HandleChaincodeStream()方法。

func (cs *ChaincodeSupport) Register(stream pb.ChaincodeSupport_RegisterServer) error {
    return cs.HandleChaincodeStream(stream)
}

4. 背書節點和鏈碼互動

4.1 準備

在構建系統鏈碼和應用鏈碼流程中,peer側執行HandleChaincodeStream()方法,鏈碼側執行chatWithPeer()方法,並通過通訊流來進行互動。其中,兩個方法中對訊息處理的方法為handleMessage()

  • 鏈碼側
switch handler.state {
case ready:
    err = handler.handleReady(msg, errc)
case established:
    err = handler.handleEstablished(msg, errc)
case created:
    err = handler.handleCreated(msg, errc)
default:
    err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state)
}
  • peer側
switch h.state {
case Created:
    return h.handleMessageCreatedState(msg)
case Ready:
    return h.handleMessageReadyState(msg)
default:
    return errors.Errorf("handle message: invalid state %s for transaction %s", h.state, msg.Txid)
}

接下來按照訊息流程介紹

  1. 鏈碼側傳送REGISTER訊息
    • 首先進行各項基本配置,然後建立起與Peer節點的gRPC連線。
    • 建立Handler,並更改Handler狀態為“Created”。
    • 傳送REGISTER訊息到peer節點。
    • 等待peer節點返回的資訊
  2. peer側接收REGISTER訊息
    • 此時peer側Handler狀態為“Created”,呼叫handleMessageCreatedState()裡面的HandleRegister()方法。
    • peer側註冊Handler,併發送REGISTERED訊息給鏈碼側
    • 更新peer側Handler狀態為“Established”
    • 並且會呼叫notifyRegistry()方法,傳送READY訊息給鏈碼側,並更新狀態為“Ready”
  3. 鏈碼側接收訊息
    • 當鏈碼側接收REGISTERED訊息,更新狀態為Handler狀態為“Established”
    • 當鏈碼側接收READY訊息,更新狀態為Handler狀態為“Ready”

至此,鏈碼容器與peer節點已完成連線準備操作。

4.2 執行鏈碼

主要實現是Execute()方法。在背書節點介紹中,存在兩種訊息型別:ChaincodeMessage_TRANSACTION/ChaincodeMessage_INIT。分別對應呼叫鏈碼和例項化鏈碼/升級鏈碼操作。此時鏈碼側和peer側Handler都處於Ready狀態。在該互動流程中,本質上是peer側傳送訊息給鏈碼側通過呼叫鏈碼的Init()/Invoke()方法完成,然後將訊息返回給鏈碼側。

4.2.1 例項化鏈碼/升級鏈碼操作

則peer側傳送的訊息型別為ChaincodeMessage_INIT。在ChaincodeSupport.execute()中會呼叫handler.execute()方法。

func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) {
    txParams.CollectionStore = h.getCollectionStore(msg.ChannelId)
    txParams.IsInitTransaction = (msg.Type == pb.ChaincodeMessage_INIT)
    // 建立交易上下文
    txctx, err := h.TXContexts.Create(txParams)
    if err != nil {
        return nil, err
    }
    // 刪除交易上下文
    defer h.TXContexts.Delete(msg.ChannelId, msg.Txid)
    
    if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil {
        return nil, err
    }
    // 非同步傳送訊息
    h.serialSendAsync(msg)

    var ccresp *pb.ChaincodeMessage
    // 等待鏈碼側響應
    select {
    case ccresp = <-txctx.ResponseNotifier:
        // response is sent to user or calling chaincode. ChaincodeMessage_ERROR
        // are typically treated as error
    case <-time.After(timeout):
        err = errors.New("timeout expired while executing transaction")
        ccName := cccid.Name + ":" + cccid.Version
        h.Metrics.ExecuteTimeouts.With(
            "chaincode", ccName,
        ).Add(1)
    }

    return ccresp, err
}

當鏈碼側接收到ChaincodeMessage_INIT型別訊息時會呼叫handler.handleInit(msg, errc)方法。

case pb.ChaincodeMessage_INIT:
        chaincodeLogger.Debugf("[%s] Received %s, initializing chaincode", shorttxid(msg.Txid), msg.Type)
        // Call the chaincode's Run function to initialize
        handler.handleInit(msg, errc)
        return nil
// handleInit handles request to initialize chaincode.
func (handler *Handler) handleInit(msg *pb.ChaincodeMessage, errc chan error) {
    go func() {
        var nextStateMsg *pb.ChaincodeMessage

        defer func() {
            // 協程結束時執行
            handler.triggerNextState(nextStateMsg, errc)
        }()
        ...
        // Get the function and args from Payload
        // 獲取方法和引數
        input := &pb.ChaincodeInput{}
        unmarshalErr := proto.Unmarshal(msg.Payload, input)
    
        // Call chaincode's Run
        // Create the ChaincodeStub which the chaincode can use to callback
        stub := new(ChaincodeStub)
        err := stub.init(handler, msg.ChannelId, msg.Txid, input, msg.Proposal)
        // 執行鏈碼的Init方法
        res := handler.cc.Init(stub)
        // Send COMPLETED message to chaincode support and change state
        nextStateMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelId}
        chaincodeLogger.Debugf("[%s] Init succeeded. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_COMPLETED)
    }()
}

在handleInit(msg, errc)方法中,會反序列化msg.Payload為鏈碼的輸入,其中包含Args。然後呼叫鏈碼的Init()方法,執行鏈碼初始化流程。並將返回結果、鏈碼事件、交易id以及通道id封裝成ChaincodeMessage_COMPLETED型別的ChaincodeMessage傳送給peer側(triggerNextState()方法呼叫serialSendAsync()傳送給peer)

當peer側接收到對應訊息。core/chaincode/handler.go handleMessageReadyState()。此時會呼叫Notify()方法把訊息寫入ResponseNotifier通道返回response。從而完成鏈碼例項化/升級流程。

switch msg.Type {
case pb.ChaincodeMessage_COMPLETED, pb.ChaincodeMessage_ERROR:
    h.Notify(msg)

4.2.2 呼叫鏈碼

peer側傳送的訊息型別為ChaincodeMessage_TRANSACTION。同理鏈碼側獲取到ChaincodeMessage_TRANSACTION訊息進行處理。會呼叫handler.handleTransaction(msg, errc)方法處理該型別訊息。該型別訊息執行流程和上述流程類似,只是此時呼叫的是鏈碼的Invoke方法。再呼叫過程中會與狀態資料庫存在互動,因此會發送訊息給peer側,peer側與狀態資料庫互動進行處理,完成後傳送訊息給鏈碼側,鏈碼側處理完成後傳送ChaincodeMessage_COMPLETED訊息給peer側。

res := handler.cc.Invoke(stub)
  • 鏈碼側:
    當在鏈碼執行過程中,需要從狀態資料庫獲取訊息時,例如
func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {
    // Access public data by setting the collection to empty string
    collection := ""
    return stub.handler.handleGetState(collection, key, stub.ChannelId, stub.TxID)
}

會在handleGetState()方法中呼叫callPeerWithChaincodeMsg()方法,再呼叫handler.sendReceive(msg, respChan)將訊息型別ChaincodeMessage_GET_STATE的訊息傳送給peer側。等待peer側的訊息返回,然後進行處理。處理完成後傳送ChaincodeMessage_COMPLETED訊息給peer側。

  • peer側:
    當peer側獲取到對應訊息時會呼叫h.HandleTransaction(msg, h.HandleGetState)進行處理。最後將對應的訊息封裝成ChaincodeMessage_RESPONSE型別訊息給鏈碼側。
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil

h.serialSendAsync(resp)

當鏈碼側處理完成後傳送ChaincodeMessage_COMPLETED訊息給peer側。peer側再通過notify()方法返回訊息給上層介面。

其他訊息型別暫不介紹,詳情請看原始碼。

上述訊息互動過程當中,Peer 和鏈碼側還會進行一項操作,那就是定期相互發送ChaincodeMessage_KEEPALIVE訊息給對方,以確保彼此是線上狀態。

總結

本節主要介紹了背書節點和鏈碼之間的互動流程。首先本節介紹了系統鏈碼和應用鏈碼的建立流程,然後介紹了鏈碼和背書節點之間是如何建立連線、如何傳送訊息的