1. 程式人生 > >Kafka原始碼分析(4)

Kafka原始碼分析(4)

四、Replication Subsystem

1Replica

Replicakafka分發資料的最小單元,主要程式碼如下:

class Replica(val brokerId: Int,
              val partition: Partition,
              time: Time = SystemTime,
              initialHighWatermarkValue: Long = 0L,
              val log: Option[Log] = None) extends Logging {
  // the high watermark offset value, in non-leader replicas only its message offsets are kept
  @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
  // the log end offset value, kept in all replicas;
  // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
  @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
  // the time when log offset is updated
  private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds)

  val topic = partition.topic
  val partitionId = partition.partitionId

……………………………………

  override def equals(that: Any): Boolean = {
    if(!(that.isInstanceOf[Replica]))
      return false
    val other = that.asInstanceOf[Replica]
    if(topic.equals(other.topic) && brokerId == other.brokerId && partition.equals(other.partition))
      return true
    false
  }

  override def hashCode(): Int = {
    31 + topic.hashCode() + 17*brokerId + partition.hashCode()
  }

……………………………………

}

其中主要成員有以下幾個:

highWatermarkMetadata,高水位線標記(簡稱HW),其實就是offset,每個(consumertopicpartition)的組合都會記錄一個offset,是用於記錄consumer的消費狀態的元資料。

logEndOffsetMetadatalogoffset的最大值(簡稱LEO),如果該replica在該broker的本地,則該值是本地log檔案的最大值,否則是該broker通過followerfetch得到的offset值。還值得注意的一點是,上述兩個變數都被打上了@volatile註解,使得在多執行緒環境下每個執行緒訪問時都得到記憶體中的最新值。

logEndOffsetUpdateTimeMsValue,意思顯而易見。

topicpartition中的topic

partitionIdpartitionid

另外該類覆蓋equals方法的程式碼值得學習,不光覆蓋了equals還覆蓋了hashCode,是書上推薦的最嚴謹的做法。

2ReplicaManager

這個類提供了kafka最重要的HA能力。該類下的方法很多,如下圖:

其中最主要的功能有以下幾個(程式碼都比較簡單,從略):

控制該物件下的ProducerRequestPurgatoryFetchRequestPurgatory物件,主動呼叫其update

respond方法來推動其中訊息的消費。

啟停和獲取replicas。在kafka0.8以前的版本中,是沒有Replication的,一旦某一個Broker宕機,則其上所有的Partition資料都不可被消費。0.8以後版本加入了這一機制作為kafkaHA特性的一部分,為了更好的做負載均衡,Kafka儘量將所有的Partition均勻分配到整個叢集上。一個典型的部署方式是一個TopicPartition數量大於Broker的數量。同時為了提高Kafka的容錯能力,也需要將同一個PartitionReplica儘量分散到不同的機器。實際上,如果所有的Replica都在同一個Broker上,那一旦該Broker宕機,該Partition的所有Replica都無法工作,也就達不到HA的效果。同時,如果某個Broker宕機了,需要保證它上面的負載可以被均勻的分配到其它倖存的所有Broker上。

讀取資料,根據request的內容,獲取每個(topicpartition)組合的replica資料,這個其實是通過呼叫獲取replicas的方法來實現的。

成為leaderfollower,這個是用於處理LeaderAndIsrRequest請求的,和kafkaLeaderElection機制有關。引入Replication之後,同一個Partition可能會有多個Replica,而這時需要在這些Replication之間選出一個LeaderProducerConsumer只與這個Leader互動,其它Replica作為FollowerLeader中複製資料。因為需要保證同一個Partition的多個Replica之間的資料一致性(其中一個宕機後其它Replica必須要能繼續服務並且即不能造成資料重複也不能造成資料丟失)。如果沒有一個Leader,所有Replica都可同時讀/寫資料,那就需要保證多個Replica之間互相(N×N條通路)同步資料,資料的一致性和有序性非常難保證,大大增加了Replication實現的複雜性,同時也增加了出現異常的機率。而引入Leader後,只有Leader負責資料讀寫,Follower只向Leader順序Fetch資料(N條通路),系統更加簡單且高效。

3PartitionLeaderSelector

實現這個特質用於實現LeaderElection機制的類共有5個,分別為OfflinePartitionLeaderSelectorReassignedPartitionLeaderSelectorPreferredReplicaPartitionLeaderSelectorControlledShutdownLeaderSelectorNoOpLeaderSelector(沒什麼用),在KafkaController中會通過不同的leader失效方式決定呼叫哪個Selector。(前面架構圖中的ReplicationController現在已經包含在KafkaController中了。)

KafkaZooKeeper中動態維護了一個ISRin-syncreplicas),這個ISR裡的所有Replica都跟上了leader,只有ISR裡的成員才有被選為Leader的可能。在這種模式下,對於f+1Replica,一個Partition能在保證不丟失已經commit的訊息的前提下容忍fReplica的失敗。在大多數使用場景中,這種模式是非常有利的。

OfflinePartitionLeaderSelector為例,該類用於在一個onlinepartition突然offline時重新選擇leader,程式碼如下,其演算法介紹在頭部註釋:

/**
 * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):
 * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live
 *    isr as the new isr.
 * 2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException.
 * 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
 * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException
 * Replicas to receive LeaderAndIsr request = live assigned replicas
 * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
 */
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
  extends PartitionLeaderSelector with Logging {
  this.logIdent = "[OfflinePartitionLeaderSelector]: "

  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
      case Some(assignedReplicas) =>
        val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
        val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
        val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
        val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
        val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
          case true =>
            // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
            // for unclean leader election.
            if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
              topicAndPartition.topic)).uncleanLeaderElectionEnable) {
              throw new NoReplicaOnlineException(("No broker in ISR for partition " +
                "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
            }

            debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
              .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
            liveAssignedReplicas.isEmpty match {
              case true =>
                throw new NoReplicaOnlineException(("No replica for partition " +
                  "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                  " Assigned replicas are: [%s]".format(assignedReplicas))
              case false =>
                ControllerStats.uncleanLeaderElectionRate.mark()
                val newLeader = liveAssignedReplicas.head
                warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
                     .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
                new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
            }
          case false =>
            val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
            val newLeader = liveReplicasInIsr.head
            debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
                  .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
            new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
        }
        info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
        (newLeaderAndIsr, liveAssignedReplicas)
      case None =>
        throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
    }
  }
}

4OffsetManager

上文已經提到,在處理OffsetCommitRequest時,當offsetCommitRequest.versionId=1則呼叫handleProducerOrOffsetCommitRequest方法,將offset值存入一個特定的topic中(稱為OffsetsTopic),而OffsetManager就是設計用於處理該場景的。將offset的管理從zookeeper遷移回kafka內部,我個人感覺還是效率目的,畢竟當(grouptopicpartition)的組合數目非常多時,讀寫zookeeper下目錄也有不小的開銷。

主要程式碼如下,可以看到,管理offsets topic的方法和管理普通topic沒有多大區別:

/**
   * Fetch the current offset for the given group/topic/partition from the underlying offsets storage.
   *
   * @param key The requested group-topic-partition
   * @return If the key is present, return the offset and metadata; otherwise return None
   */
  private def getOffset(key: GroupTopicPartition) = {
    val offsetAndMetadata = offsetsCache.get(key)
    if (offsetAndMetadata == null)
      OffsetMetadataAndError.NoOffset
    else
      OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError)
  }

  /**
   * Put the (already committed) offset for the given group/topic/partition into the cache.
   *
   * @param key The group-topic-partition
   * @param offsetAndMetadata The offset/metadata to be stored
   */
  private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) {
    offsetsCache.put(key, offsetAndMetadata)
  }

  def putOffsets(group: String, offsets: Map[TopicAndPartition, OffsetAndMetadata]) {
    // this method is called _after_ the offsets have been durably appended to the commit log, so there is no need to
    // check for current leadership as we do for the offset fetch
    trace("Putting offsets %s for group %s in offsets partition %d.".format(offsets, group, partitionFor(group)))
    offsets.foreach { case (topicAndPartition, offsetAndMetadata) =>
      putOffset(GroupTopicPartition(group, topicAndPartition), offsetAndMetadata)
    }
  }

  /**
   * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
   * returns the current offset or it begins to sync the cache from the log (and returns an error code).
   */
  def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
    trace("Getting offsets %s for group %s.".format(topicPartitions, group))

    val offsetsPartition = partitionFor(group)

    /**
     * followerTransitionLock protects against fetching from an empty/cleared offset cache (i.e., cleared due to a
     * leader->follower transition). i.e., even if leader-is-local is true a follower transition can occur right after
     * the check and clear the cache. i.e., we would read from the empty cache and incorrectly return NoOffset.
     */
    followerTransitionLock synchronized {
      if (leaderIsLocal(offsetsPartition)) {
        if (loadingPartitions synchronized loadingPartitions.contains(offsetsPartition)) {
          debug("Cannot fetch offsets for group %s due to ongoing offset load.".format(group))
          topicPartitions.map { topicAndPartition =>
            val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
            (groupTopicPartition.topicPartition, OffsetMetadataAndError.OffsetsLoading)
          }.toMap
        } else {
          if (topicPartitions.size == 0) {
           // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
            offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
              (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError))
            }.toMap
          } else {
            topicPartitions.map { topicAndPartition =>
              val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
              (groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
            }.toMap
          }
        }
      } else {
        debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
        topicPartitions.map { topicAndPartition =>
          val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
          (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotOffsetManagerForGroup)
        }.toMap
      }
    }
  }

  /**
   * Asynchronously read the partition from the offsets topic and populate the cache
   */
  def loadOffsetsFromLog(offsetsPartition: Int) {

    val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)

    loadingPartitions synchronized {
      if (loadingPartitions.contains(offsetsPartition)) {
        info("Offset load from %s already in progress.".format(topicPartition))
      } else {
        loadingPartitions.add(offsetsPartition)
        scheduler.schedule(topicPartition.toString, loadOffsets)
      }
    }

    def loadOffsets() {
      info("Loading offsets from " + topicPartition)

      val startMs = SystemTime.milliseconds
      try {
        replicaManager.logManager.getLog(topicPartition) match {
          case Some(log) =>
            var currOffset = log.logSegments.head.baseOffset
            val buffer = ByteBuffer.allocate(config.loadBufferSize)
            // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
            while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
              buffer.clear()
              val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
              messages.readInto(buffer, 0)
              val messageSet = new ByteBufferMessageSet(buffer)
              messageSet.foreach { msgAndOffset =>
                require(msgAndOffset.message.key != null, "Offset entry key should not be null")
                val key = OffsetManager.readMessageKey(msgAndOffset.message.key)
                if (msgAndOffset.message.payload == null) {
                  if (offsetsCache.remove(key) != null)
                    trace("Removed offset for %s due to tombstone entry.".format(key))
                  else
                    trace("Ignoring redundant tombstone for %s.".format(key))
                } else {
                  val value = OffsetManager.readMessageValue(msgAndOffset.message.payload)
                  putOffset(key, value)
                  trace("Loaded offset %s for %s.".format(value, key))
                }
                currOffset = msgAndOffset.nextOffset
              }
            }

            if (!shuttingDown.get())
              info("Finished loading offsets from %s in %d milliseconds."
                   .format(topicPartition, SystemTime.milliseconds - startMs))
          case None =>
            warn("No log found for " + topicPartition)
        }
      }
      catch {
        case t: Throwable =>
          error("Error in loading offsets from " + topicPartition, t)
      }
      finally {
        loadingPartitions synchronized loadingPartitions.remove(offsetsPartition)
      }
    }
  }

5AbstractFetcherManager/AbstractFetcherThread

這兩個抽象類是用於管理partitionfetcher,即kafka的資料消費機制。其中Manager的作用是建立Thread,並將Thread繫結到partition上(或從partition上移除)。Thread的作用是處理FetchRequest,從指定partition的當前offset處繼續讀取不超過HW的資料,讀取的當前offset由記憶體中一個(topic,partition) -> offsetHashMap進行管理,且對該物件的讀寫都是互斥的。

ReplicaFetcherManager/ReplicaFetcherThread是對這兩個抽象類的具體實現,程式碼從略。