1. 程式人生 > >深入學習Kafka:PartitionLeaderSelector原始碼分析

深入學習Kafka:PartitionLeaderSelector原始碼分析

PartitionLeaderSelector主要是為分割槽選舉出leader broker,該trait只定義了一個方法selectLeader,接收一個TopicAndPartition物件和一個LeaderAndIsr物件。
TopicAndPartition表示要選leader的分割槽,而第二個引數表示zookeeper中儲存的該分割槽的當前leader和ISR記錄。該方法會返回一個元組包括了選舉出來的leader和ISR以及需要接收LeaderAndISr請求的一組副本。

trait PartitionLeaderSelector {
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}

PartitionLeaderSelector的實現類主要有
1. NoOpLeaderSelector
2. OfflinePartitionLeaderSelector
3. ReassignedPartitionLeaderSelector
4. PreferredReplicaPartitionLeaderSelector
5. ControlledShutdownLeaderSelector

PartitionLeaderSelector 類圖

NoOpLeaderSelector

NoOpLeaderSelector就是啥也不做的LeaderSelector。

class NoOpLeaderSelector
(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[NoOpLeaderSelector]: " def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment."
) (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) } }

ControlledShutdownLeaderSelector

當controller收到shutdown命令後,觸發新的分割槽主副本選舉
先找出已分配的副本集合(assignedReplicas),然後過濾出仍存活的副本集合(liveAssignedReplicas),在該列表中選取第一個broker作為該分割槽的主副本

class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
        extends PartitionLeaderSelector
        with Logging {

  this.logIdent = "[ControlledShutdownLeaderSelector]: "

  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion

    val currentLeader = currentLeaderAndIsr.leader

    //已分配的Replica
    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
    val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
    //仍存活的Replica
    val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))

    //當前的ISR列表中濾掉掛掉的broker
    val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
    //存活的ISR列表中,選出第一個broker作為該分割槽的Leader(主副本)
    liveAssignedReplicas.filter(newIsr.contains).headOption match {
      case Some(newLeader) =>
        //如果存在,則將當前LeaderEpoch計數器加1,對應的Zookeeper節點的版本號也加1
        debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
        (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
      case None =>
        //不存在則報錯StateChangeFailedException
        throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
          " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
    }
  }
}

PreferredReplicaPartitionLeaderSelector

當controller收到分割槽主副本重新優化分配命令後,觸發新的分割槽主副本優化,即將AR裡的第一個取出,作為優化後的主副本

class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
  this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "

  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    //已分配的Replica
    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
    //已分配的Replica列表中第一個即為最優的副本
    val preferredReplica = assignedReplicas.head
    // check if preferred replica is the current leader
    //檢查是否當前分割槽主副本已經是最優的副本,則報錯LeaderElectionNotNeededException
    val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
    if (currentLeader == preferredReplica) {
      throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
                                                   .format(preferredReplica, topicAndPartition))
    } else {
      info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
        " Trigerring preferred replica leader election")
      // check if preferred replica is not the current leader and is alive and in the isr
      if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
        //如果當前的最優主副本存活,返回將其設為最優主副本,則將當前LeaderEpoch計數器加1,對應的Zookeeper節點的版本號也加1
        (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
          currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
      } else {
        //如果當前的最優主副本掛掉了,則報錯StateChangeFailedException
        throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
          "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
      }
    }
  }
}

ReassignedPartitionLeaderSelector

在某個topic重新分配分割槽的時候,觸發新的主副本選舉,將存活的ISR中的第一個副本選舉成為主副本(leader)

class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
  this.logIdent = "[ReassignedPartitionLeaderSelector]: "

  /**
   * The reassigned replicas are already in the ISR when selectLeader is called.
   */
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    //重新分配的ISR副本集
    val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
    //過濾出仍存活的重新分配的ISR副本集
    val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
                                                                             currentLeaderAndIsr.isr.contains(r))
    //選取ISR中的第一個為主副本
    val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
    newLeaderOpt match {
      //返回ISR中的第一個為主副本,則將當前LeaderEpoch計數器加1,對應的Zookeeper節點的版本號也加1
      case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
        currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
      case None =>
        //如果沒有存活的ISR,則報錯NoReplicaOnlineException,選舉失敗
        reassignedInSyncReplicas.size match {
          case 0 =>
            throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
              " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
          case _ =>
            throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
              "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
        }
    }
  }
}

OfflinePartitionLeaderSelector

選出新的Leader,新的ISR,步驟如下:
1. 如果至少有一個broker在ISR列表中,並且存活,則將其選為leader,ISR中存活的為新的ISR
2. 如果ISR列表為空,且unclean.leader.election.enable=false,則報錯NoReplicaOnlineException
3. 如果unclean.leader.election.enable=true,即意味著可以選舉不在ISR列表中的broker為Leader,即在AR列表中選出Leader,但是這樣會引起資料不一致
4. 若AR列表也為空,則報錯NoReplicaOnlineException

class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
  extends PartitionLeaderSelector with Logging {
  this.logIdent = "[OfflinePartitionLeaderSelector]: "

  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
      case Some(assignedReplicas) =>
        val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
        val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
        val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
        val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
        val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
          case true =>
            // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
            // for unclean leader election.
            if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
              ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
              throw new NoReplicaOnlineException(("No broker in ISR for partition " +
                "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
            }

            debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
              .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
            liveAssignedReplicas.isEmpty match {
              case true =>
                //若AR列表也為空,則報錯NoReplicaOnlineException
                throw new NoReplicaOnlineException(("No replica for partition " +
                  "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                  " Assigned replicas are: [%s]".format(assignedReplicas))
              case false =>
                //如果unclean.leader.election.enable=true,即意味著可以選舉不在ISR列表中的broker為Leader,即在AR列表中,選出Leader
                ControllerStats.uncleanLeaderElectionRate.mark()
                val newLeader = liveAssignedReplicas.head
                warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
                     .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
                new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
            }
          case false =>
            //如果至少有一個broker在ISR列表中,並且存活,則將其選為leader,ISR中存活的為新的ISR
            val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
            val newLeader = liveReplicasInIsr.head
            debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
                  .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
            new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
        }
        info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
        (newLeaderAndIsr, liveAssignedReplicas)
      case None =>
        throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
    }
  }
}