1. 程式人生 > >kafka原始碼解析之十二KafkaController(上篇)

kafka原始碼解析之十二KafkaController(上篇)

class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
……
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
  onControllerResignation, config.brokerId)

/**
 * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
 * is the controller. It merely registers the session expiration listener and starts the controller leader
 * elector
 */
def startup() = {
  inLock(controllerContext.controllerLock) {
    info("Controller starting up");
    registerSessionExpirationListener()//註冊一個會話超時的listener
    isRunning = true
    controllerElector.startup//啟動controllerElector
    info("Controller startup complete")
  }
}
}
其zk選舉的路徑為/controller/*,並且對zk叢集建立一個會話超時的listener
class SessionExpirationListener() extends IZkStateListener with Logging {
  this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
  @throws(classOf[Exception])
  def handleStateChanged(state: KeeperState) {
    // do nothing, since zkclient will do reconnect for us.
  }
  /**
   * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
   * any ephemeral nodes here.
   *
   * @throws Exception
   *             On any error.
   */
  @throws(classOf[Exception])
  def handleNewSession() {
    info("ZK expired; shut down all controller components and try to re-elect")
    inLock(controllerContext.controllerLock) {
      onControllerResignation()//當會話超時,重新連線上的時候,呼叫之前註冊在ZookeeperLeaderElector的onControllerResignation函式
      controllerElector.elect//重新選舉
    }
  }
}
因此重點關注ZookeeperLeaderElector內部的邏輯:
class ZookeeperLeaderElector(controllerContext: ControllerContext,
                             electionPath: String,
                             onBecomingLeader: () => Unit,
                             onResigningAsLeader: () => Unit,
                             brokerId: Int)
  extends LeaderElector with Logging {
  var leaderId = -1
  // create the election path in ZK, if one does not exist
  val index = electionPath.lastIndexOf("/")
  if (index > 0)
    makeSurePersistentPathExists(controllerContext.zkClient, electionPath.substring(0, index))
  val leaderChangeListener = new LeaderChangeListener

  def startup {
    inLock(controllerContext.controllerLock) {//其選舉路徑為/controller/*
      controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
      elect//觸發選舉
    }
  }

  private def getControllerID(): Int = {
    readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
       case Some(controller) => KafkaController.parseControllerId(controller)
       case None => -1
    }
  }
    
  def elect: Boolean = {
    val timestamp = SystemTime.milliseconds.toString
    val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
   
   leaderId = getControllerID 
    /* 
     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
     * it's possible that the controller has already been elected when we get here. This check will prevent the following 
     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
     */
    if(leaderId != -1) {
       debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
       return amILeader
    }

    try {//通過zk建立Ephemeral Node的方式來進行選舉,即如果存在併發情況下向zk的同一個路徑建立node的話,有且只有1個客戶端會建立成功,其它客戶端建立失敗,但是當建立成功的客戶端和zk的連結斷開之後,這個node也會消失,其它的客戶端從而繼續競爭
      createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
        (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
        controllerContext.zkSessionTimeout)
      info(brokerId + " successfully elected as leader")
      leaderId = brokerId
      onBecomingLeader()//如果成功,則自己成為leader
    } catch {
      case e: ZkNodeExistsException =>
        // If someone else has written the path, then
        leaderId = getControllerID 

        if (leaderId != -1)
          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
        else
          warn("A leader has been elected but just resigned, this will result in another round of election")

      case e2: Throwable =>
        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
        resign()//發生異常,刪除路徑
    }
    amILeader
  }

  def close = {
    leaderId = -1
  }

  def amILeader : Boolean = leaderId == brokerId

  def resign() = {
    leaderId = -1
    deletePath(controllerContext.zkClient, electionPath)
  }

  /**
   * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
   * have its own session expiration listener and handler
   */
  class LeaderChangeListener extends IZkDataListener with Logging {
    /**
     * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
     * @throws Exception On any error.
     */
    @throws(classOf[Exception])
    def handleDataChange(dataPath: String, data: Object) {
      inLock(controllerContext.controllerLock) {
        leaderId = KafkaController.parseControllerId(data.toString)
        info("New leader is %d".format(leaderId))
      }
    }

    /**
     * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
     * @throws Exception
     *             On any error.
     */
    @throws(classOf[Exception])
    def handleDataDeleted(dataPath: String) {//KafkaController在第一次啟動的時候沒有選舉成功,然後當其發現節點已經消失的時候,會重新觸發選舉
      inLock(controllerContext.controllerLock) {
        debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
          .format(brokerId, dataPath))
        if(amILeader)//可能之前自己的角色是leader,則重新選舉未必成為leader,則需要清除之前所有快取的內容
          onResigningAsLeader()
        elect//觸發選舉
      }
    }
  }
}
因此KafkaController成為leader分2種情況:
1. 第一次啟動的時候會主動觸發elect,如果被選舉成為leader,則做leader該做的事情
2. 第一次啟動的時候選舉失敗,則通過LeaderChangeListener監控/controller/*路徑,發現下面資料被刪除的時候,觸發handleDataDeleted,從而再次觸發選舉

12.2 kafkaController的初始化(leader)

從上節可以看到,KafkaController選舉成功則呼叫onBecomingLeader,當之前的leader再次觸發選舉的時候呼叫onResigningAsLeader,以上2個函式分別對應:onControllerFailover和onControllerResignation。
onControllerResignation很簡單,就是把裡面所有的模組shutdown或者登出掉:
def onControllerResignation() {
  // de-register listeners
  deregisterReassignedPartitionsListener()
  deregisterPreferredReplicaElectionListener()
  // shutdown delete topic manager
  if (deleteTopicManager != null)
    deleteTopicManager.shutdown()
  // shutdown leader rebalance scheduler
  if (config.autoLeaderRebalanceEnable)
    autoRebalanceScheduler.shutdown()
  inLock(controllerContext.controllerLock) {
    // de-register partition ISR listener for on-going partition reassignment task
    deregisterReassignedPartitionsIsrChangeListeners()
    // shutdown partition state machine
    partitionStateMachine.shutdown()
    // shutdown replica state machine
    replicaStateMachine.shutdown()
    // shutdown controller channel manager
    if(controllerContext.controllerChannelManager != null) {
      controllerContext.controllerChannelManager.shutdown()
      controllerContext.controllerChannelManager = null
    }
    // reset controller context
    controllerContext.epoch=0
    controllerContext.epochZkVersion=0
    brokerState.newState(RunningAsBroker)
  }
}
以上各種模組會在onControllerFailover介紹,onControllerFailover本質上就是開啟裡面所有的功能。
onControllerFailover的邏輯如下:
 def onControllerFailover() {
    if(isRunning) {
      info("Broker %d starting become controller state transition".format(config.brokerId))
      readControllerEpochFromZookeeper()
//記錄選舉的時鐘,每成功選舉一次,遞增1
      incrementControllerEpoch(zkClient)
/*leader初始化,具體內容見評註*/
      registerReassignedPartitionsListener()
      registerPreferredReplicaElectionListener()
      partitionStateMachine.registerListeners()
      replicaStateMachine.registerListeners()
      initializeControllerContext()
      replicaStateMachine.startup()
      partitionStateMachine.startup()
      controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
      brokerState.newState(RunningAsController)
      maybeTriggerPartitionReassignment()
      maybeTriggerPreferredReplicaElection()
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
      if (config.autoLeaderRebalanceEnable) {
        info("starting the partition rebalance scheduler")
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
      }
      deleteTopicManager.start()
    }
    else
      info("Controller has been shut down, aborting startup/failover")
  }
其中步驟如下:
1) 在/admin/reassign_partitions目錄註冊partitionReassignedListener監聽函式
2) 在/admin/preferred_replica_election目錄註冊preferredReplicaElectionListener監聽函式
3) 在/brokers/topics目錄註冊topicChangeListener監聽函式
4) 在/admin/delete_topics目錄註冊deleteTopicsListener監聽函式
5) 在/brokers/ids目錄註冊brokerChangeListener監聽函式
6) 初始化ControllerContext上下文,裡面包含了topic的各種元資料資訊,除此之外ControllerContext內部的ControllerChannelManager負責和kafka叢集內部的其它KafkaServer建立channel來進行通訊,TopicDeletionManager
負責刪除topic
7)通過replicaStateMachine初始化所有的replica狀態
8)通過partitionStateMachine初始化所有的partition狀態
9) 在brokers/topics/***(具體的topic名字)/目錄下注冊AddPartitionsListener函式
10) 通過處理之前啟動留下的partition重分配的情況
11) 處理之前啟動留下的replica重新選舉的情況
12)向其它KafkaServer傳送叢集topic的元資料資訊已進行資料的同步更新
13)根據配置是否開啟自動均衡
14)開始刪除topic
KafkaControl主要通過以上各種監聽函式來完成kafka叢集元資料的管理,接下來先詳細描述PartitionStateMachine和ReplicaStateMachine原理,因為kafka topic 的partition狀態和內容主要是通過以上2個管理類來實現的,然後按照上面的流程描述不同的listener的作用。