1. 程式人生 > >菜鳥學習Fabric原始碼學習 — kafka共識機制

菜鳥學習Fabric原始碼學習 — kafka共識機制

Fabric 1.4原始碼分析 kafka共識機制

本文件主要介紹kafka共識機制流程。在檢視文件之前可以先閱覽raft共識流程以及orderer服務啟動流程。

1. kafka 簡介

Kafka是最初由Linkedin公司開發,是一個分散式、分割槽的、多副本的、多訂閱者,基於zookeeper協調的分散式日誌系統,一種高吞吐量的分散式釋出訂閱訊息系統。kafka詳細介紹可以參考這一篇部落格。kafka介紹

2. kafka共識

kafka共識當中,每個orderer節點即是生產者Producer也是消費者Consumer,在具體設計當中,每個channel對應一個topic,並且為了保證順序性,只設置了一個patition。(參見orderer啟動初始化kafka共識程式碼afka.New(conf, metricsProvider, healthChecker, registrar)),關於kafka共識,這裡推薦一篇部落格,可以看看設計思路以及實現流程。The ABCs of Kafka in Hyperledger Fabric

實現共識演算法需要實現的介面。

type Consenter interface {
    // 處理普通交易
    Order(env *cb.Envelope, configSeq uint64) error
    // 處理配置交易
    Configure(config *cb.Envelope, configSeq uint64) error
    WaitReady() error
}

而介面chain在Consenter介面基礎上增加來部分介面

type Chain interface {
    Order(env *cb.Envelope, configSeq uint64) error
    Configure(config *cb.Envelope, configSeq uint64) error
    WaitReady() error
    Errored() <-chan struct{}
    // 分配資源
    Start()
    // 釋放資源
    Halt()
    MigrationStatus() migration.Status
}

kafka共識實現程式碼路徑為:orderer/consensus/kafka/chain.go;首先,在建立chain時會呼叫start()方法分配資源,在kafka共識中,會初始化生產者producer、消費者Consumer以及一些配置。後續重點通過原始碼來介紹producer和consumer模組實現以及kafka共識整個交易的流程,即主要介紹交易排序整個流程。在此基礎上,解決個人的一些疑問。

3. 交易排序處理

3.1 orderer作為生產者

首先,當傳送一個交易給orderer時,會呼叫orderer模組的broadcast()服務,其中會呼叫bh.ProcessMessage(msg, addr)方法根據交易型別呼叫不同的方法處理。

其中無論是配置交易還是普通交易都會呼叫chain.enqueue()方法,通過chain.producer.SendMessage(message)方法將交易寫入kafka。從而orderer作為生產者角色功能就是將客戶端發過來的交易寫入kafka。再次強調一下,每個通道對應一個topic,每個topic只有一個patition。

func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
    logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
    select {
    case <-chain.startChan: // The Start phase has completed
        select {
        case <-chain.haltChan: // The chain has been halted, stop here
            logger.Warningf("[channel: %s] consenter for this channel has been halted", chain.ChainID())
            return false
        default: // The post path
            payload, err := utils.Marshal(kafkaMsg)
            if err != nil {
                logger.Errorf("[channel: %s] unable to marshal Kafka message because = %s", chain.ChainID(), err)
                return false
            }
            message := newProducerMessage(chain.channel, payload)
            if _, _, err = chain.producer.SendMessage(message); err != nil {
                logger.Errorf("[channel: %s] cannot enqueue envelope because = %s", chain.ChainID(), err)
                return false
            }
            logger.Debugf("[channel: %s] Envelope enqueued successfully", chain.ChainID())
            return true
        }
    default: // Not ready yet
        logger.Warningf("[channel: %s] Will not enqueue, consenter for this channel hasn't started yet", chain.ChainID())
        return false
    }
}

3.2 orderer作為消費者

orderer作為消費者的功能為:將kafka對應topic裡面的交易打包成區塊,並寫入賬本。

其中,在orderer建立對應chain的時候呼叫chain.start()。

func (chain *chainImpl) Start() {
    go startThread(chain)
}

kafka會開啟協程go startThread(chain),其中會對kafka進行一系列初始化工作。最後會呼叫chain.processMessagesToBlocks()方法,生成對應區塊。

func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
    ...
    for {
        select {
        ...
        case in, ok := <-chain.channelConsumer.Messages():
            ...
            switch msg.Type.(type) {
            ...
            case *ab.KafkaMessage_Regular:
                if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil {
                    logger.Warningf("[channel: %s] Error when processing incoming message of type REGULAR = %s", chain.ChainID(), err)
                    counts[indexProcessRegularError]++
                } else {
                    counts[indexProcessRegularPass]++
                }
            }
            ...
    }
}

其中,會對chain.processRegular(msg.GetRegular(), in.Offset)訊息進行處理。

其中,會針對配置交易和普通交易進行分別處理。普通交易會呼叫chain.BlockCutter().Ordered(message)生成對應的batchs,配置交易會一個交易一個區塊,直接呼叫chain.BlockCutter().Cut()生成batch。然後再生成區塊、寫入賬本。

4. 問題思考

  1. kafka共識模式下動態更新系統通道配置新增orderer,是否就可提供排序服務。
    經檢視程式碼,在orderer啟動過程中,只有raft共識會判斷該orderer(raft節點)是否在對應對consenter叢集中。如果不在則會建立inactivechain,無法提供排序服務(必須更新consenter才行)。但是kafka不存在上述過程,在orderer啟動後,會從kafka同步系統通道區塊,當區塊包括建立通道交易時,會建立應用通道,同步應用通道區塊(該流程類似raft共識寫賬本流程)。因此,該orderer可以提供服務,但是如果是每個組織提供orderer的場景、由於沒有更新應用通道排序組織,從而導致無法通過服務發現獲取該orderer資訊。