1. 程式人生 > >kafka原始碼分析

kafka原始碼分析

原文地址:http://www.aboutyun.com/thread-9938-1-1.html

問題導讀
1.Kafka提供了Producer類作為java producer的api,此類有幾種傳送方式?
2.總結呼叫producer.send方法包含哪些流程?
3.Producer難以理解的在什麼地方?

producer的傳送方式剖析
Kafka提供了Producer類作為java producer的api,該類有sync和async兩種傳送方式。
sync架構圖


async架構


呼叫流程如下:


程式碼流程如下:
Producer:當new Producer(new ProducerConfig()),其底層實現,實際會產生兩個核心類的例項:Producer、DefaultEventHandler。在建立的同時,會預設new一個ProducerPool,即我們每new一個

java的Producer類,就會有建立Producer、EventHandler和ProducerPool,ProducerPool為連線不同kafka broker的池,初始連線個數有broker.list引數決定。
呼叫producer.send方法流程:
當應用程式呼叫producer.send方法時,其內部其實調的是eventhandler.handle(message)方法,eventHandler會首先序列化該訊息,
eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()
呼叫邏輯解釋:當
客戶端
應用程式呼叫producer傳送訊息messages時(既可以傳送單條訊息,也可以傳送List多條訊息),呼叫eventhandler.serialize首先序列化所有訊息,序列化操作使用者可以自定義實現Encoder介面,下一步呼叫partitionAndCollate根據topics的messages進行分組操作,messages分配給dataPerBroker(多個不同的Broker的Map),根據不同Broker呼叫不同的SyncProducer.send批量傳送訊息資料,SyncProducer包裝了nio網路操作資訊。
Producer的sync與async傳送訊息處理,大家看以上
架構
圖一目瞭然。
partitionAndCollate方法詳細作用:獲取所有partitions的leader所在leaderBrokerId(就是在該partiionid的leader分佈在哪個broker上),
建立一個HashMap>>>,把messages按照brokerId分組組裝資料,然後為SyncProducer分別傳送訊息作準備工作。

名稱解釋:partKey:分割槽關鍵字,當客戶端應用程式實現Partitioner介面時,傳入引數key為分割槽關鍵字,根據key和numPartitions,返回分割槽(partitions)索引。記住partitions分割槽索引是從0開始的。

Producer平滑擴容機制
如果開發過producer客戶端程式碼,會知道metadata.broker.list引數,它的含義是kafak broker的ip和port列表,producer初始化時,就連線這幾個broker,這時大家會有疑問,producer支援kafka cluster新增broker節點?它又沒有監聽zk broker節點或從zk中獲取broker資訊,答案是肯定的,producer可以支援平滑擴容broker,他是通過定時與現有的metadata.broker.list通訊,獲取新增broker資訊,然後把新建的SyncProducer放入ProducerPool中。等待後續應用程式呼叫。

複製程式碼
DefaultEventHandler類中初始化例項化BrokerPartitionInfo類,然後定期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分程式碼如下:
  def handle(events: Seq[KeyedMessage[K,V]]) {
    ......
    while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
      if (topicMetadataRefreshInterval >= 0 &&
          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
        Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        topicMetadataToRefresh.clear
        lastTopicMetadataRefreshTime = SystemTime.milliseconds
      }
      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
      if (outstandingProduceRequests.size > 0) {
        info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
        //休眠時間,多長時間重新整理一次
        Thread.sleep(config.retryBackoffMs)
        // 生產者定期請求重新整理最新topics的broker元資料資訊
        Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
        .....
      }
    }
  }
複製程式碼

BrokerPartitionInfo的updateInfo方法程式碼如下:

複製程式碼
 def updateInfo(topics: Set[String], correlationId: Int) {
    var topicsMetadata: Seq[TopicMetadata] = Nil
    //根據topics列表,meta.broker.list,其他配置引數,correlationId表示請求次數,一個計數器引數而已
    //建立一個topicMetadataRequest,並隨機的選取傳入的broker資訊中任何一個去取metadata,直到取到為止
    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
    topicsMetadata = topicMetadataResponse.topicsMetadata
    // throw partition specific exception
    topicsMetadata.foreach(tmd =>{
      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
      if(tmd.errorCode == ErrorMapping.NoError) {
        topicPartitionInfo.put(tmd.topic, tmd)
      } else
        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
      tmd.partitionsMetadata.foreach(pmd =>{
        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
            ErrorMapping.exceptionFor(pmd.errorCode).getClass))
        } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
      })
    })
    producerPool.updateProducer(topicsMetadata)
  }
複製程式碼

ClientUtils.fetchTopicMetadata方法程式碼:

複製程式碼
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
    var fetchMetaDataSucceeded: Boolean = false
    var i: Int = 0
    val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
    var topicMetadataResponse: TopicMetadataResponse = null
    var t: Throwable = null
    val shuffledBrokers = Random.shuffle(brokers) //生成隨機數
    while(i 
ProducerPool的updateProducer
def updateProducer(topicMetadata: Seq[TopicMetadata]) {
    val newBrokers = new collection.mutable.HashSet[Broker]
    topicMetadata.foreach(tmd => {
      tmd.partitionsMetadata.foreach(pmd => {
        if(pmd.leader.isDefined)
          newBrokers+=(pmd.leader.get)
      })
    })
    lock synchronized {
      newBrokers.foreach(b => {
        if(syncProducers.contains(b.id)){
          syncProducers(b.id).close()
          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
        } else
          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
      })
    }
  }
複製程式碼

當我們啟動kafka broker後,並且大量producer和consumer時,經常會報如下異常資訊。

  1. [email protected]:/opt/soft$ Closing socket connection to 192.168.11.166
複製程式碼




筆者也是經常很長時間看原始碼分析,才明白了為什麼ProducerConfig配置資訊裡面並不要求使用者提供完整的kafka叢集的broker資訊,而是任選一個或幾個即可。因為他會通過您選擇的broker和topics資訊而獲取最新的所有的broker資訊。
值得了解的是用於傳送TopicMetadataRequest的SyncProducer雖然是用ProducerPool.createSyncProducer方法建出來的,但用完並不還回ProducerPool,而是直接Close.


重難點理解:
重新整理metadata並不僅在第一次初始化時做。為了能適應kafka broker執行中因為各種原因掛掉、paritition改變等變化,
eventHandler會定期的再去重新整理一次該metadata,重新整理的間隔用引數topic.metadata.refresh.interval.ms定義,預設值是10分鐘。
這裡有三點需要強調:

客戶端呼叫send, 才會新建SyncProducer,只有呼叫send才會去定期重新整理metadata在每次取metadata時,kafka會新建一個SyncProducer去取metadata,邏輯處理完後再close。根據當前SyncProducer(一個Broker的連線)取得的最新的完整的metadata,重新整理ProducerPool中到broker的連線.每10分鐘的重新整理會直接重新把到每個broker的socket連線重建,意味著在這之後的第一個請求會有幾百毫秒的延遲。如果不想要該延遲,把topic.metadata.refresh.interval.ms值改為-1,這樣只有在傳送失敗時,才會重新重新整理。Kafka的叢集中如果某個partition所在的broker掛了,可以檢查錯誤後重啟重新加入叢集,手動做rebalance,producer的連線會再次斷掉,直到rebalance完成,那麼重新整理後取到的連線著中就會有這個新加入的broker。


說明:每個SyncProducer例項化物件會建立一個socket連線


特別注意:
在ClientUtils.fetchTopicMetadata呼叫完成後,回到BrokerPartitionInfo.updateInfo繼續執行,在其末尾,pool會根據上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)

在ProducerPool中,SyncProducer的數目是由該topic的partition數目控制的,即每一個SyncProducer對應一個broker,內部封了一個到該broker的socket連線。每次重新整理時,會把已存在SyncProducer給close掉,即關閉socket連線,然後新建SyncProducer,即新建socket連線,去覆蓋老的。
如果不存在,則直接建立新的。