1. 程式人生 > >跟我學Kafka之Controller控制器詳解

跟我學Kafka之Controller控制器詳解

作者:小程

我們的kafka原始碼分享已經進行過很多期了,主要的內容也都分享的差不多了,那麼在今後的分享中,主要集中在kafka效能優化和使用。

Kafka叢集中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配Partition之類的管理任務。如果當前的Controller失敗,會從其他正常的Broker中重新選舉Controller。

進入KafkaController.scala檔案看到如下程式碼:

class KafkaController(val config : KafkaConfig, zkClient: ZkClient
, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) val
partitionStateMachine =
new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId) // have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) var deleteTopicManager: TopicDeletionManager = null val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this) private val partitionReassignedListener = new PartitionsReassignedListener(this) private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)

在KafkaController類中定義了很多屬性,我們先重點了解下面的PartitionLeaderSelector物件,主要是為分割槽選舉出leader broker,該trait只定義了一個方法selectLeader,接收一個TopicAndPartition物件和一個LeaderAndIsr物件。TopicAndPartition表示要選leader的分割槽,而第二個引數表示zookeeper中儲存的該分割槽的當前leader和ISR記錄。該方法會返回一個元組包括了推舉出來的leader和ISR以及需要接收LeaderAndISr請求的一組副本。

trait PartitionLeaderSelector {  
    def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}

通過我們上面的程式碼,可以看到在KafkaController中共定義了五種selector選舉器:

  • 1、NoOpLeaderSelector
  • 2、OfflinePartitionLeaderSelector
  • 3、ReassignedPartitionLeaderSelector
  • 4、PreferredReplicaPartitionLeaderSelector
  • 5、ControlledShutdownLeaderSelector

我們在解釋這五個選擇器之前,先了解一下在Kafka中Partition的四種狀態:

  • NonExistentPartition —— 這個狀態表示該分割槽要麼沒有被建立過或曾經被建立過但後面被刪除了。
  • NewPartition —— 分割槽建立之後就處於NewPartition狀態。在這個狀態中,分割槽應該已經分配了副本,但是還沒有選舉出leader和ISR。
  • OnlinePartition —— 一旦分割槽的leader被推選出來,它就處於OnlinePartition狀態。
  • OfflinePartition —— 如果leader選舉出來後,leader broker宕機了,那麼該分割槽就處於OfflinePartition狀態。

四種狀態的轉換關係如下:

NonExistentPartition -> NewPartition

  1. 首先將第一個可用的副本broker作為leader broker並把所有可用的副本物件都裝入ISR,然後寫leader和ISR資訊到zookeeper中儲存
  2. 對於這個分割槽而言,傳送LeaderAndIsr請求到每個可用的副本broker,以及UpdateMetadata請求到每個可用的broker上

OnlinePartition, OfflinePartition -> OnlinePartition

為該分割槽選取新的leader和ISR以及接收LeaderAndIsr請求的一組副本,然後寫入leader和ISR資訊到zookeeper中儲存。

NewPartition, OnlinePartition -> OfflinePartition

標記分割槽狀態為離線(offline)。

OfflinePartition -> NonExistentPartition

離線狀態標記為不存在分割槽,表示該分割槽失敗或者被刪除。

在介紹完最基本的概念之後,下面我們將重點介紹上面提到過的五種選舉器:
1、ReassignedPartitionLeaderSelector
從可用的ISR中選取第一個作為leader,把當前的ISR作為新的ISR,將重分配的副本集合作為接收LeaderAndIsr請求的副本集合。
2、PreferredReplicaPartitionLeaderSelector
如果從assignedReplicas取出的第一個副本就是分割槽leader的話,則丟擲異常,否則將第一個副本設定為分割槽leader。
3、ControlledShutdownLeaderSelector
將ISR中處於關閉狀態的副本從集合中去除掉,返回一個新新的ISR集合,然後選取第一個副本作為leader,然後令當前AR作為接收LeaderAndIsr請求的副本。
4、NoOpLeaderSelector
原則上不做任何事情,返回當前的leader和isr。
5、OfflinePartitionLeaderSelector
從活著的ISR中選擇一個broker作為leader,如果ISR中沒有活著的副本,則從assignedReplicas中選擇一個副本作為leader,leader選舉成功後註冊到Zookeeper中,並更新所有的快取。

所有的leader選擇完成後,都要通過請求把具體的request路由到對應的handler處理。目前kafka並沒有把handler抽象出來,而是每個handler都是一個函式,混在KafkaApi類中。
其實也就是如下的程式碼:

def handle(request: RequestChannel.Request) {  
  try{  
    trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)  
    request.requestId match {  
      case RequestKeys.ProduceKey => handleProducerRequest(request)  // producer  
      case RequestKeys.FetchKey => handleFetchRequest(request)       // consumer  
      case RequestKeys.OffsetsKey => handleOffsetRequest(request)  
      case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)  
      case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //成為leader或follower設定同步副本組資訊  
      case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)  
      case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)  
      case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)  //shutdown broker  
      case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)  
      case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)  
      case requestId => throw new KafkaException("Unknown api code " + requestId)  
    }  
  } catch {  
    case e: Throwable =>  
      request.requestObj.handleError(e, requestChannel, request)  
      error("error when handling request %s".format(request.requestObj), e)  
  } finally  
    request.apiLocalCompleteTimeMs = SystemTime.milliseconds  
}

這裡面的每個請求在上面給出的連結的文章中都有過解釋說明,在這裡不多解釋。

RequestKeys.LeaderAndIsr詳細分析
在上面的程式碼中咱們看到ReequestKeys.LeaderAndlst對應的方法其實是KeyhandleLeaderAndIsrRequest。

def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
    // ensureTopicExists is only for client facing requests
    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
    // stop serving data to clients for the topic being deleted
    val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
    try {
      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
      requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
    } catch {
      case e: KafkaStorageException =>
        fatal("Disk error during leadership change.", e)
        Runtime.getRuntime.halt(1)
    }
  }

將request.requestObj轉換成LeaderAndIstRequest物件型別。

Sample Flowchart Template.png

流程圖說明

1、如果請求中controllerEpoch小於當前最新的controllerEpoch,則直接返回ErrorMapping.StaleControllerEpochCode。

2、如果partitionStateInfo中的leader epoch大於當前ReplicManager中儲存的(topic, partitionId)對應的partition的leader epoch,則:

2.1、如果當前brokerid(或者說replica id)在partitionStateInfo中,則將該partition及partitionStateInfo存入一個名為partitionState的HashMap中。
否則說明該Broker不在該Partition分配的Replica list中,將該資訊記錄於log中

3、如果partitionStateInfo中的leader epoch小於當前ReplicManager則將相應的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中。

4、篩選出partitionState中Leader與當前Broker ID相等的所有記錄存入partitionsTobeLeader中,其它記錄存入partitionsToBeFollower中。
如果partitionsTobeLeader不為空,則對其執行makeLeaders方。
如果partitionsToBeFollower不為空,則對其執行makeFollowers方法。


小程故事多

小程故事多

支付領域專家,關注分散式,大資料等技術
個人技術部落格:flychao88.iteye.com