1. 程式人生 > >kafka原始碼分析之kafkaApis

kafka原始碼分析之kafkaApis

KafkaApis

說明:用於處理對kafka的訊息請求的中心轉發元件,kafkaapis需要依賴於如下幾個元件:

apis = new KafkaApis(socketServer.requestChannelreplicaManager

consumerCoordinator,kafkaControllerzkUtilsconfig.brokerIdconfigmetadataCachemetrics,  

authorizer)

其最核心的處理主要由KafkaApis中的handle函式進行排程.

請求處理池

在KafkaApis例項生成後,會同時生成一個KafkaRequestHandlerPool

例項.

這個例項主要用於對kafka的請求進行處理的例項,需要依賴如下幾個元件與配置:

配置項num.io.threads,預設值8,用於處理IO操作的執行緒個數.

requestHandlerPool new KafkaRequestHandlerPool(config.brokerId,

socketServer.requestChannelapisconfig.numIoThreads)

這裡會根據io的執行緒個數,生成對應的處理執行緒KafkaRequestHandler.

this.logIdent "[Kafka Request Handler on Broker " 

+ brokerId + "], "val threads new Array[Thread](numThreads)val runnables new Array[KafkaRequestHandler](numThreads)for(i <- until numThreads) {runnables(i) = new KafkaRequestHandler(ibrokerIdaggregateIdleMeternumThreadsrequestChannelapis)threads(i) = Utils.daemonThread("kafka-request-handler-" 
+ irunnables(i))threads(i).start()}

接下來看看KafkaRequestHandler執行緒:

def run() {while(true) {try {

這裡從請求佇列中,取出一個請求,直接交給KafkaApis進行處理.var req : RequestChannel.Request = null      while (req == null) {// We use a single meter for aggregate idle percentage for the thread pool.        // Since meter is calculated as total_recorded_value / time_window and        // time_window is independent of the number of threads, each recorded idle        // time should be discounted by # threads.val startSelectTime = SystemTime.nanosecondsreq = requestChannel.receiveRequest(300)val idleTime = SystemTime.nanoseconds - startSelectTime        aggregateIdleMeter.mark(idleTime / totalHandlerThreads)      }if(req eq RequestChannel.AllDone) {        debug("Kafka request handler %d on broker %d received shut down 

          command".format(          idbrokerId))return}      req.requestDequeueTimeMs = SystemTime.millisecondstrace("Kafka request handler %d on broker %d handling request %s".format(id

brokerIdreq))      apis.handle(req)    } catch {case e: Throwable => error("Exception when handling request"e)    }  }}

對網路請求進行處理

這個部分通過KafkaApis中的handle函式進行處理,並根據不同的請求路由進行不同的處理.

處理metadata更新請求

當某個partition發生變化後,會通過生成UpdateMetadataRequest請求向所有的brokers傳送這個請求,也就是說每一個活著的broker都會接受到metadata變化的請求,並對請求進行處理.

這個處理在partition的狀態發生變化,partition重新分配,broker的啟動與停止時,會發起update metadata的請求.

入口通過KafkaApis中的handle函式

caseRequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)

接下來看看handleUpdateMetadataRequest的函式處理流程:

def handleUpdateMetadataRequest(request: RequestChannel.Request) {val updateMetadataRequest = 

        request.requestObj.asInstanceOf[UpdateMetadataRequest]

首先檢查當前的使用者是否有ClusterAction操作的許可權,如果有接著執行下面的流程。  authorizeClusterAction(request)

根據請求的metadata的更新訊息,更新對memtadataCache中的內容。這個包含有broker的新增與刪除,partition的狀態更新等。  replicaManager.maybeUpdateMetadataCache(updateMetadataRequestmetadataCache)val updateMetadataResponse = new UpdateMetadataResponse(

              updateMetadataRequest.correlationId)

  requestChannel.sendResponse(new Response(request

new RequestOrResponseSend(request.connectionIdupdateMetadataResponse)))}

看看ReplicaManager中處理對更新metadata的請求的流程:

在副本管理元件中,直接通過MetadataCache中的updateCache函式對請求過來的訊息進行處理,用於更新當前的broker中的cache資訊。

更新cache的流程:

1,更新cache中用於儲存所有的broker節點的aliveBrokers集合。

2,對請求過來的修改過狀態的partition的集合進行迭代,

2,1,如果partition的leader的節點被標記為-2,表示這是一個被刪除的partition,從cache集合中找到這個partition對應的topic的子集合,並從這個集合中移出這個partition,如果這個topic中已經不在包含partition時,從cache中直接移出掉這個topic.

2,2,這種情況下,表示是對partition的狀態的修改,包含partition的副本資訊,與partition的leader的isr的資訊,直接更新cache集合中topic子集合中對應此partition的狀態資訊。

def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest

metadataCache: MetadataCache) {replicaStateChangeLock synchronized {if(updateMetadataRequest.controllerEpoch < controllerEpoch) {val stateControllerEpochErrorMessage = ("Broker %d received update metadata 

        request with correlation id %d from an " +"old controller %d with epoch %d. Latest known controller epoch is %d")

        .format(localBrokerId,updateMetadataRequest.correlationIdupdateMetadataRequest.controllerId

updateMetadataRequest.controllerEpoch,controllerEpoch)stateChangeLogger.warn(stateControllerEpochErrorMessage)throw new ControllerMovedException(stateControllerEpochErrorMessage)    } else {      metadataCache.updateCache(updateMetadataRequestlocalBrokerId

stateChangeLogger)controllerEpoch = updateMetadataRequest.controllerEpoch    }  }}

處理partitionLeaderAndIsr請求

這個請求主要是針對partitionleader或者isr發生變化後的請求處理.這個接收請求的broker節點一定會是包含有對應的partition的副本的節點才會被接收到資料.

caseRequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)

接下來看看handleLeaderAndIsrRequest的處理流程:

defhandleLeaderAndIsrRequest(request: RequestChannel.Request) {

首先先得到請求的內容.針對一個LeaderAndIsr的請求,得到的請求內容是一個LeaderAndIsrRequest的例項.val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]

檢查當前的請求使用者是否具備ClusterAction操作的許可權.  authorizeClusterAction(request)try {

這個函式用於在partition的isr被改變後,對成為leader的副本與成為follower的副本判斷這個副本對應的topic是否是內建的__consumer_offsets topic,通過GroupMetadataManager中的對應函式來處理內建的topic的leader上線與下線的操作.def onLeadershipChange(updatedLeaders: Iterable[Partition],

updatedFollowers: Iterable[Partition]) {updatedLeaders.foreach { partition =>if (partition.topic == GroupCoordinator.GroupMetadataTopicName)          coordinator.handleGroupImmigration(partition.partitionId)      }      updatedFollowers.foreach { partition =>if (partition.topic == GroupCoordinator.GroupMetadataTopicName)          coordinator.handleGroupEmigration(partition.partitionId)      }    }

根據請求的partition,通過副本管理元件來對partition進行leader或者follower的選擇.// call replica manager to handle updating partitions to become leader or followerval result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest

metadataCacheonLeadershipChange)val leaderAndIsrResponse = new LeaderAndIsrResponse(

           leaderAndIsrRequest.correlationId,

           result.responseMapresult.errorCode)

生成操作成功後的返回結果,並向請求方進行響應.requestChannel.sendResponse(new Response(request,

new RequestOrResponseSend(request.connectionIdleaderAndIsrResponse)))

  } catch {case e: KafkaStorageException =>      fatal("Disk error during leadership change."e)Runtime.getRuntime.halt(1)  }}

ReplicaManager中的becomeLeaderOrFollower函式:

這個函式用於判斷指定的partition是應該成為leader還是應該成為follower.def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,metadataCache: MetadataCache,onLeadershipChange: (Iterable[Partition]

Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {

  leaderAndISRRequest.partitionStateInfos.foreach { case (

(topicpartition)stateInfo) =>stateChangeLogger.trace("日誌)  }

replicaStateChangeLock synchronized {val responseMap = new mutable.HashMap[(String, Int), Short]

如果當前請求的epoch的值小於當前controllerEpoch的值,列印warn的日誌,

並返回StaleControllerEpochCode錯誤程式碼.if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {      leaderAndISRRequest.partitionStateInfos.foreach { 

case ((topicpartition)stateInfo) =>stateChangeLogger.warn(("日誌)      }

BecomeLeaderOrFollowerResult(responseMap

ErrorMapping.StaleControllerEpochCode)    } else {

這裡得到當前請求的最新的epoch的值,並設定當前的broker的epoch的值為請求的值,val controllerId = leaderAndISRRequest.controllerIdval correlationId = leaderAndISRRequest.correlationIdcontrollerEpoch = leaderAndISRRequest.controllerEpoch

對請求的所有的partition進行迭代,並對partition的狀態進行檢查.// First check partition's leader epochval partitionState = new mutable.HashMap[PartitionPartitionStateInfo]()      leaderAndISRRequest.partitionStateInfos.foreach {

case ((topicpartitionId)partitionStateInfo) =>

這裡通過getOrCreatePartition從allPartitions集合中得到這個partition的例項,如果這個partition的例項在集合中不存在時,會建立這個例項.val partition = getOrCreatePartition(topicpartitionId)val partitionLeaderEpoch = partition.getLeaderEpoch()

檢查當前的partition中的leaderEpoch的值是否小於新請求的值,如果小於這個值,同時這個partition對應的副本包含有當前的broker時,把這個partition與狀態新增到partitionState的集合中,否則表示當前的broker中不包含這個partition的副本,列印一個日誌,並在responseMap中記錄這個partition的error code為UnknownTopicOrPartitionCode.如果partition的leaderEpoch的值大於或等於請求的epoch的值,列印日誌,並在responseMap中新增這個partition的error code的值為StaleLeaderEpochCode.if (partitionLeaderEpoch < partitionStateInfo.

             leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) {if(partitionStateInfo.allReplicas.contains(config.brokerId))              partitionState.put(partitionpartitionStateInfo)else {stateChangeLogger.warn(("日誌)

              responseMap.put((topicpartitionId),               

ErrorMapping.UnknownTopicOrPartitionCode)            }          } else {// Otherwise record the error code in responsestateChangeLogger.warn(("日誌)            responseMap.put((topicpartitionId)ErrorMapping.StaleLeaderEpochCode)          }      }

這裡根據partition的副本包含有當前的broker節點的所有的partition的集合,得到這個partition的leader是當前的broker的所有的partition的集合,同時得到包含有當前的broker的副本的partition中,leader不是當前的broker的所有的partition的集合.val partitionsTobeLeader = partitionState.filter {

case (partitionpartitionStateInfo) =>           partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 

             config.brokerId}val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)

如果partitions需要被切換成leader的集合不為空,對這些需要在當前的broker中的partition執行leader操作的集合執行makeLeaders函式.這裡得到的集合是partition中當前broker被搞成leader的partition集合,val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)        makeLeaders(controllerIdcontrollerEpochpartitionsTobeLeader

leaderAndISRRequest.correlationIdresponseMap)elseSet.empty[Partition]

如果partitions需要被切換成follower的集合不為空,對這些需要在當前的broker中的partition執行follower操作的集合執行makeFollowers函式.這裡得到的集合是partition中當前broker被搞成follower的partition集合,val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)        makeFollowers(controllerIdcontrollerEpochpartitionsToBeFollower

leaderAndISRRequest.correlationIdresponseMapmetadataCache)elseSet.empty[Partition]

更新每個partition中最後一個offset的值到日誌目錄下的checkpoint檔案中.if (!hwThreadInitialized) {        startHighWaterMarksCheckPointThread()hwThreadInitialized true}

停止掉沒有partition引用的fetcher的執行緒.這個執行緒用於對partition的訊息的同步,從leader的partition中同步資料到follower中.replicaFetcherManager.shutdownIdleFetcherThreads()

這裡根據當前節點是leader的partition集合與當前節點變成follower的partition集合,檢查這些partition對應的topic是否是__consumer_offsets topic,這個topic用來記錄每個consumer對應的消費的offset的資訊,如果是這個topic的partition時,根據leader與follower的集合,通過GroupMetadataManager例項對兩個集合分別執行partition的leader的上線與下線的操作.      onLeadershipChange(partitionsBecomeLeaderpartitionsBecomeFollower)BecomeLeaderOrFollowerResult(responseMapErrorMapping.NoError)    }  }}

Partitionleader設定

在LeaderAndIsr的請求過來時,如果請求的訊息中對應的partition的leader是當前的broker節點時,會根據這個partitions的集合執行ReplicaManager.makeLeaders的操作,

 */private def makeLeaders(controllerId: Int,epoch: Int,partitionState: Map[PartitionPartitionStateInfo],correlationId: Int,responseMap: mutable.Map[(String, Int), Short])

: Set[Partition] = {  partitionState.foreach(state =>stateChangeLogger.trace(這裡列印日誌)

對所有要把當前節點設定成leader的partition設定為NoError的錯誤程式碼,這個表示沒有錯誤.for (partition <- partitionState.keys)    responseMap.put((partition.topicpartition.partitionId)

ErrorMapping.NoError)val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()try {

這裡從同步partition的訊息的執行緒中移出需要在當前的broker中成為Leader的partition.// First stop fetchers for all the partitionsreplicaFetcherManager.removeFetcherForPartitions(

       partitionState.keySet.map(new TopicAndPartition(_)))

這裡根據需要在當前的broker中設定成leader的partition的集合進行迭代,根據迭代的Partition例項中的makeLeader函式來設定partition的leader,並得到成功設定leader的所有的partition的集合(如果partition的leader本身就在這個broker上,這個函式返回的值為false).這個函式的最後返回這個被生成設定leader的partitions的集合.// Update the partition information to be the leaderpartitionState.foreach{ case (partitionpartitionStateInfo) =>if (partition.makeLeader(controllerIdpartitionStateInfocorrelationId))        partitionsToMakeLeaders += partitionelsestateChangeLogger.info(("日誌"));}    partitionsToMakeLeaders.foreach { partition =>stateChangeLogger.trace(日誌)    }  } catch {case e: Throwable =>      partitionState.foreach { state =>v