1. 程式人生 > >kafka原始碼分析之kafkacluster的管理-KafkaController

kafka原始碼分析之kafkacluster的管理-KafkaController

KafkaController

說明,這個例項主要用於對kafka cluster進行管理,一個kafkacluster表示同一個zk環境下所有的broker的集合,在這個cluster中需要有一個broker被選舉成為leader,用於管理其它的broker的上線與下線的處理,對partition與副本的分配,管理topic的新增/修改/刪除操作。

例項建立與啟動

/* start kafka controller */kafkaController new KafkaController(configzkUtilsbrokerState,

kafkaMetricsTime

metricsthreadNamePrefix)kafkaController.startup()

例項初始化:

this.logIdent "[Controller " + config.brokerId "]: "private var isRunning trueprivate val stateChangeLogger = KafkaController.stateChangeLoggerval controllerContext new ControllerContext(zkUtilsconfig.zkSessionTimeoutMs)

用於對partition的狀態進行維護與更新操作.

val partitionStateMachine new PartitionStateMachine(this)

用於對每個partition的每一個副本的狀態進行維護與更新操作.val replicaStateMachine new ReplicaStateMachine(this)

生成用於監聽leader發生變化的zookeeper的監聽,用於當前的broker正在成為leader或者broker正在失去leader.private val controllerElector new ZookeeperLeaderElector(controllerContext

ZkUtils.ControllerPathonControllerFailover,onControllerResignationconfig.brokerId)private val autoRebalanceScheduler new KafkaScheduler(1)var deleteTopicManager: TopicDeletionManager = null

下面生成用於對partition的leader的選擇的對應的選擇器,包含partition載入時,重新分配時,首先副本,controll下線val offlinePartitionSelector new 

OfflinePartitionLeaderSelector(controllerContextconfig)private val reassignedPartitionLeaderSelector new 

ReassignedPartitionLeaderSelector(controllerContext)private val preferredReplicaPartitionLeaderSelector new 

PreferredReplicaPartitionLeaderSelector(controllerContext)private val controlledShutdownPartitionLeaderSelector new 

ControlledShutdownLeaderSelector(controllerContext)private val brokerRequestBatch new ControllerBrokerRequestBatch(this)

這裡生成幾個用於監聽partition的修改與isr的修改的監聽器.private val partitionReassignedListener new PartitionsReassignedListener(this)

這個用於在對partition的首先副本進行顯示重新修改時,

用於監聽對/admin/preferred_replica_election路徑的修改.private val preferredReplicaElectionListener new 

PreferredReplicaElectionListener(this)

用於監聽當partition的leader發生變化時,更新partitionLeadershipInfo集合的內容,同時向所有的brokers節點發送metadata修改的請求.private val isrChangeNotificationListener new 

IsrChangeNotificationListener(this)

KafkaController的例項啟動:

def startup() = {inLock(controllerContext.controllerLock) {    info("Controller starting up")

這裡有於監聽對ZK的session的註冊,當向zk進行註冊時,生成一個session後會呼叫onControllerResignation函式.這個函式會先取消對zookeeper的監聽(partition相關),停止掉對應的管理組合.    registerSessionExpirationListener()isRunning true

在/controller的zk路徑上註冊監聽LeaderChangeListener例項.並執行選舉.controllerElector.startup    info("Controller startup complete")  }}

leader的選舉與動作

啟動時的選舉

通過在controller啟動時呼叫ZookeeperLeaderElector例項中的startup函式.

這個函式在/controller中註冊一個LeaderChangeListener的監聽器,用於監聽controller中的內容改變與刪除.並執行函式進行elect選舉.

defelect: Boolean = {val timestamp = SystemTime.milliseconds.toStringval electString = Json.encode(Map("version" -> 1,

"brokerid" -> brokerId"timestamp" -> timestamp))

這裡直接從zookeeper中找到/controller路徑下的內容,如果路徑下存在內容時,直接讀取這個節點的內容,並解析出leaderid(也就是被選為leader的broker),如果leader已經被選舉出來,直接返回.leaderId = getControllerID if(leaderId != -1) {     debug("Broker %d has been elected as leader, so stopping the election 

           process.".format(leaderId))return amILeader  }

流程執行到這裡,表示當前還沒有broker節點被選舉為leader.try {

這裡嘗試把當前節點註冊成kafka的leader,通過create函式,如果這個函式執行成功,沒有返回異常時,表示當前節點被選舉成了leader,把當前的brokerId設定到leaderId的屬性上.val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,electString,                                               

controllerContext.zkUtils.zkConnection.getZookeeper,

                        JaasUtils.isZkSecurityEnabled())    zkCheckedEphemeral.create()    info(brokerId + " successfully elected as leader")leaderId = brokerId

這裡執行生成此例項傳入的函式來執行leader的進入,這個函式的實現為KafkaController中onControllerFailover的函式.    onBecomingLeader()  } catch {case e: ZkNodeExistsException =>

如果通過ZKCheckedEphemeral去在zk中建立內容時失敗,返回的錯誤是節點已經存在時,表示在這個kafka的server註冊前,已經被其它的節點註冊成了leader,讀取/controller的路徑,並設定其內容中的brokerId為leaderId.// If someone else has written the path, thenleaderId = getControllerID if (leaderId != -1)        debug("Broker %d was elected as leader instead of broker %d"

.format(leaderIdbrokerId))elsewarn("A leader has been elected but just resigned, this will result in another 

             round of election")case e2: Throwable =>

如果是其它的錯誤,設定leaderId的值為-1,同時刪除/controller的路徑的內容.      error("Error while electing or becoming leader on broker %d"

.format(brokerId)e2)      resign()  }

這裡返回的值如果是true表示當前節點就是leader,否則表示當前節點不是leader,對應的leaderId值就是leader節點的brokerId.

amILeader}

Broker被選舉成leadercontroller的處理

噹噹前的kafka server被選舉成為leader,會執行KafkaController中的onControllerFailover函式來處理leader的進入.

defonControllerFailover() {if(isRunning) {    info("Broker %d starting become controller state 

           transition".format(config.brokerId))//read controller epoch from zk

/controller_epoch路徑上讀取controllerepoch的值,這個值用於控制controller的切換.

把讀取到的值儲存到controllerContext中的epochepochZkVersion屬性中.

readControllerEpochFromZookeeper()

更新controllerContext中的epoch的值(加1),並持久化到/controller_epoch路徑上.// increment the controller epochincrementControllerEpoch(zkUtils.zkClient)

註冊對/admin/reassign_partitions節點的監聽處理程式,由PartitionsReassignedListener實現.用於監聽partition的重新分配的動作.主要用於監聽節點的內容修改registerReassignedPartitionsListener()

註冊對/isr_change_notification節點的監聽處理程式,這個節點主要用於通知partitoin的isr的變化,由IsrChangeNotificationListener實現.主要用於監聽節點的內容修改,    registerIsrChangeNotificationListener()

註冊對/admin/preferred_replica_election節點的監聽處理程式,這個節點用於對副本的首選節點進行處理,由PreferredReplicaElectionListener實現.主要用於監聽節點的內容修改.    registerPreferredReplicaElectionListener()

在partitionStateMachine中對/brokers/topics節點註冊監聽處理程式,用於監聽topic的修改,由TopicChangeListener實現.主要用於監聽這個節點的子節點的修改.

如果配置有deletetopic的啟用時,通過配置delete.topic.enable,預設為false.如果這個值配置為true時,對/admin/delete_topics節點註冊一個DeleteTopicsListener監聽處理程式,用於監聽這個節點下的子節點的修改.partitionStateMachine.registerListeners()

/brokers/ids節點註冊一個BrokerChangeListener監聽處理程式,用於監聽這個節點的子節點的修改,主要用於監聽broker的的改變.replicaStateMachine.registerListeners()

初始化controller的上下文.    initializeControllerContext()

啟動對broker狀態的監聽與partition的狀態監聽的例項.replicaStateMachine.startup()partitionStateMachine.startup()

根據現在kafka中所有的topic,對/brokers/topics/topicname節點註冊一個AddPartitionsListener監聽處理程式,用於監聽這個topic的修改.// register the partition change listeners for all existing topics on failovercontrollerContext.allTopics.foreach(topic => 

partitionStateMachine.registerPartitionChangeListener(topic))    info("Broker %d is ready to serve as the new controller with epoch 

        %d".format(config.brokerIdepoch))

設定當前的broker的狀態為RunningASController的broker,表示這是一個leader的節點.    brokerState.newState(RunningAsController)

這裡對未完成partition的副本重新分配的partitionsBeingReassigned集合進行迭代,執行如下的流程處理:

1,根據準備重新分配的partition的副本所在的節點集合,檢查當前liveBrokers中是否都存在這些節點,如果要重新分配的節點集合中有在liveBrokers中不包含的節點,表示要分配的副本所在節點有沒有啟動的節點,throw exception,

2,根據需要重新分配的partition從partitionReplicaAssignment集合中找到對應的partition的資訊,這個集合中儲存了已經分配的partition的副本資訊,如果在已經分配的partition的集合中找不到這個partition,throw exception.

3,如果準備重新分配的副本節點集合與現在partitionReplicaAssignment集合中parition對應的副本節點集合是相同的內容,表示重新分配是沒有必要的,throw exception.

4,這種情況表示重新分配的副本節點集合對應的節點都已經啟動,同時這個集合與現在此partition對應的分配副本節點集合不相同,執行如下的子流程:

4,1,在/brokers/topics/tpname/partitions/pid/state路徑上生成註冊一個用於監聽isr的變化的ReassignedPartitionsIsrChangeListener監聽程式.

4,2,在deleteTopicManager中檢查這個topic是否是需要刪除的topic,如果是,新增到準備刪除的topic的集合中.

4,3,執行對partition中副本的重新分配,通過onPartitionReassignment函式.    maybeTriggerPartitionReassignment()

這裡對還未完成首選副本分配的partition進行首選副本分配的操作,這些未分配首選副本儲存在partitionsUndergoingPreferredReplicaElection集合中.

1,首先檢查對應的partitions的topic是否是已經被刪除的topic,如果包含有要刪除的topic時,把對應的partitions集合新增到待刪除的topic partitions的集合中.

2,通過partitionStateMachine例項修改要進行首選副本分配的所有的partitions的狀態為OnlinePartition.並通過preferredReplicaPartitionLeaderSelector例項進行partition的首選副本的選擇操作,通過讀取/brokers/topics/topicname/partitions/pid/state路徑的isr的資訊,如果這個路徑還不存在時,根據當前partition的所有活著的副本集合,取第一個副本為leader,並把這個副本集合儲存到這個路徑中,根據讀取這個路徑的資訊,通過leaderSelector來進行首選副本的分配.

3,更新partitionLeadershipInfo集合的內容,把這個partition對應的isr儲存到這個集合中,並向對應的broker節點發送LeaderAndIsrRequest請求.

4,移出partitionsUndergoingPreferredReplicaElection集合中的內容,

        並刪除/admin/preferred_replica_election節點的資料.    maybeTriggerPreferredReplicaElection()

向所有的broker的節點發送全部topic的metadata更新的UpdateMetadataRequest請求./* send partition leadership info to all live brokers */sendUpdateMetadataRequest(

controllerContext.liveOrShuttingDownBrokerIds.toSeq)

如果auto.leader.rebalance.enable配置為true,預設值也是true,

根據leader.imbalance.check.interval.seconds配置的間隔時間,對partition進行balance操作.預設配置為300秒.定時執行的排程函式為checkAndTriggerPartitionRebalance.if (config.autoLeaderRebalanceEnable) {      info("starting the partition rebalance scheduler")autoRebalanceScheduler.startup()autoRebalanceScheduler.schedule("partition-rebalance-thread",

checkAndTriggerPartitionRebalance,5config.leaderImbalanceCheckIntervalSeconds.toLongTimeUnit.SECONDS)    }

啟動刪除topic的管理元件TopicDeletionManager,這個例項中生成一個DeleteTopicsThread執行緒,前提是delete.topic.enable配置值為true.否則這個例項什麼都不做.deleteTopicManager.start()  }elseinfo("Controller has been shut down, aborting startup/failover")}

初始化controller上下文

這個只有在當前對應的brokerleader節點時,才會被執行.

private def initializeControllerContext() {

這裡得到節點下所有的子節點的資料,這裡儲存有當前所有啟動的broker的資訊,通過這些資訊生成Broker例項,並存儲到controllerContext中的liveBrokers集合中,每一個Broker例項包含有這個broker的連線資訊.// update controller cache with delete topic informationcontrollerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet

得到/brokers/topics節點下所有的子節點資訊,這裡儲存有當前kafka的叢集中建立過的所有的topic資訊,並把topic儲存到allTopics的集合中.controllerContext.allTopics = zkUtils.getAllTopics().toSet

根據kafka cluster中所有的topic,找到對應/brokers/topics/topicname節點下的所有的partitions的配置,一個partition中包含有partition的id與對應的多個副本地址.

在controllerContext.partitionReplicaAssignment中儲存有每個partition對應的副本位置.controllerContext.partitionReplicaAssignment 

      zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)

這裡這個集合用於儲存每一個partition對應的leader的更換順序.controllerContext.partitionLeadershipInfo new 

mutable.HashMap[TopicAndPartitionLeaderIsrAndControllerEpoch]

這個用於儲存已經下線的broker的集合.controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]

/brokers/topics/topicname/partitions/pid/state節點下讀取對應的partition的leader的節點的變換的順序集合.並存儲到controllerContext中的partitionLeadershipInfo集合中.// update the leader and isr cache for all existing partitions from ZookeeperupdateLeaderAndIsrCache()

這裡生成ControllerChannelManager例項並根據broker的個數,針對第一個broker生成一個socket的連線,並啟動對應的訊息傳送的RequestSendThread執行緒,用於向各個broker進行通訊.// start the channel managerstartChannelManager()

這裡主要是把還沒有進行首選副本的選舉的partition進行處理,未進行首選副本處理的partition儲存 在/admin/preferred_replica_election節點,從這個節點讀取資料這裡儲存有每一個topic與partition的資訊,並根據topic與parition進行分組,排除掉相同的topicAndPartition的記錄.

儲存到controllerContext.partitionsUndergoingPreferredReplicaElection集合中,通過檢查每個partition對應的isr的leader是否是replica副本集合中的第一個broker或者說這個partition對應的replica的集合為空(這個表示topic被刪除掉了),如果是這種條件時,表示這些partition的首選副本選舉已經完成,

把這個partition從partitionsUndergoingPreferredReplicaElection集合中稱出.,  initializePreferredReplicaElection()

這裡主要把需要進行重新分配的partition從/admin/reassign_partitions節點讀取出來,這裡得到的是需要進行重新分配的所有的partition與對應的新的副本節點id.取出副本個數為0(刪除的topic)或者分配的partition的副本與準備重新分配的副本相同的所有的partition,這些partition表示已經分配完成,把所有的準備重新分配的partitions新增到context中的partitionsBeingReassigned集合中,並從集合中移出被刪除掉的topic的partition與已經完成分配的parition的資訊.  initializePartitionReassignment()

這裡生成topic刪除的管理元件,

1,首先根據partitionReplicaAssignment集合中每個partition對應的副本節點,判斷這些副本所對應的在節點在liveBrokers裡面是否存在,找到不存在的所有的topic集合.

2,得到partitionsUndergoingPreferredReplicaElection集合中還未完成首選副本分配的所有的partition對應的topic集合.

3,得到partitionsBeingReassigned集合中準備進行副本重新分配的所有partition對應的topic的集合.

4,根據上面3個計算出來的集合取並集,並根據zk中/admin/delete_topics節點下的所有的子節點(要刪除的topic),生成TopicDeletionManager例項,這個例項用於控制對需要刪除的topic進行刪除操作,在這個例項中,可以刪除的topic是根據前3個步驟計算出來的topic與zk中配置已經標記為刪除的topic的集合取交集的集合(有可能刪除配置中的topic已經被刪除完成,所以取配置為刪除,但還沒有被刪除的topic).  initializeTopicDeletion()  info("Currently active brokers in the cluster: %s"

.format(controllerContext.liveBrokerIds))  info("Currently shutting brokers in the cluster: %s"

.format(controllerContext.shuttingDownBrokerIds))

相關推薦

kafka原始碼分析kafkacluster管理-KafkaController

KafkaController 說明,這個例項主要用於對kafka cluster進行管理,一個kafka的cluster表示同一個zk環境下所有的broker的集合,在這個cluster中需要有一個broker被選舉成為leader,用於管理其它的broker的上線與

Kafka 原始碼分析LogSegment

這裡分析kafka LogSegment原始碼 通過一步步分析LogManager,Log原始碼之後就會發現,最終的log操作都在LogSegment上實現.LogSegment負責分片的讀寫恢復重新整理刪除等動作都在這裡實現.LogSegment程式碼同樣在原始碼目錄log下. LogSe

kafka原始碼分析consumer的原始碼

Consumer的client端 示例程式碼 Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SER

elasticsearch原始碼分析叢集管理

一、背景 Elasticsearch是一個實時分散式搜尋和分析引擎。它讓你以前所未有的速度處理大資料成為可能。本文主要介紹實現分散式搜尋和分析的基礎–儲存,好的儲存設計在根本上決定了查詢的效能。 es的儲存本質上是採用了lucene全文索引,在其基礎上實現了分散式功

kafka原始碼分析kafkaApis

KafkaApis 說明:用於處理對kafka的訊息請求的中心轉發元件,kafkaapis需要依賴於如下幾個元件: apis = new KafkaApis(socketServer.requestChannel, replicaManager,  consumer

kafka原始碼解析十二KafkaController(上篇)

class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { …… private val c

kafka原始碼分析producer

Producer的client端 示例程式碼 Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("client.id", "De

Netty4原始碼分析記憶體管理

引用計數netty中使用引用計數機制來管理資源,當一個實現ReferenceCounted的物件例項化時,引用計數置1.客戶程式碼中需要保持一個該物件的引用時需要呼叫介面的retain方法將計數增1.物件使用完畢時呼叫release將計數減1.當引用計數變為0時,物件將釋放

【React Native】原始碼分析Native UI的封裝和管理

  ReactNative作為使用React開發Native應用的新框架,隨著時間的增加,無論是社群還是個人對她的興趣與日遞增。此文目的是希望和大家一起欣賞一下ReactNative的部分原始碼。閱讀原始碼好處多多,讓攻城獅更溜的開發ReactNative應

MySQL系列:innodb原始碼分析表空間管理

innodb在實現表空間(table space)基於檔案IO之上構建的一層邏輯儲存空間管理,table space採用邏輯分層的結構:space、segment inode、extent和page.在實現層的邏輯使用了磁碟連結串列這種結構來管理邏輯關係。我們先來介紹磁碟連

spark 原始碼分析十五 -- Spark記憶體管理剖析

本篇文章主要剖析Spark的記憶體管理體系。 在上篇文章 spark 原始碼分析之十四 -- broadcast 是如何實現的?中對儲存相關的內容沒有做過多的剖析,下面計劃先剖析Spark的記憶體機制,進而進入記憶體儲存,最後再剖析磁碟儲存。本篇文章主要剖析記憶體管理機制。 整體介紹 Spar

spark 原始碼分析二十二-- Task的記憶體管理

問題的提出 本篇文章將回答如下問題: 1.  spark任務在執行的時候,其記憶體是如何管理的? 2. 堆內記憶體的定址是如何設計的?是如何避免由於JVM的GC的存在引起的記憶體地址變化的?其內部的記憶體快取池回收機制是如何設計的? 3. 堆外和堆內記憶體分別是通過什麼來分配的?其資料的偏移

Springboot原始碼分析事務攔截和管理

摘要: 在springboot的自動裝配事務裡面,InfrastructureAdvisorAutoProxyCreator ,TransactionInterceptor,PlatformTransactionManager這三個bean都被裝配進來了,InfrastructureAdvisorAutoPr

Kafka原始碼分析及圖解原理Producer端

一.前言   任何訊息佇列都是萬變不離其宗都是3部分,訊息生產者(Producer)、訊息消費者(Consumer)和服務載體(在Kafka中用Broker指代)。那麼本篇主要講解Producer端,會有適當的圖解幫助理解底層原理。   一.開發應用   首先介紹一下開發應用,如何構建一個KafkaP

Kafka原始碼分析及圖解原理Broker端

一.前言   https://www.cnblogs.com/GrimMjx/p/11354987.html   上一節說過,任何訊息佇列都是萬變不離其宗都是3部分,訊息生產者(Producer)、訊息消費者(Consumer)和服務載體(在Kafka中用Broker指代)。上一節講了kafka prod

Spark原始碼分析Spark Shell(上)

https://www.cnblogs.com/xing901022/p/6412619.html 文中分析的spark版本為apache的spark-2.1.0-bin-hadoop2.7。 bin目錄結構: -rwxr-xr-x. 1 bigdata bigdata 1089 Dec

Netty 原始碼分析拆包器的奧祕

為什麼要粘包拆包 為什麼要粘包 首先你得了解一下TCP/IP協議,在使用者資料量非常小的情況下,極端情況下,一個位元組,該TCP資料包的有效載荷非常低,傳遞100位元組的資料,需要100次TCP傳送,100次ACK,在應用及時性要求不高的情況下,將這100個有效資料拼接成一個數據包,那會縮短到一個TCP資

Android原始碼分析為什麼在onCreate() 和 onResume() 獲取不到 View 的寬高

轉載自:https://www.jianshu.com/p/d7ab114ac1f7 先來看一段很熟悉的程式碼,可能在最開始接觸安卓的時候,大部分人都寫過的一段程式碼;即嘗試在 onCreate() 和 onResume() 方法中去獲取某個 View 的寬高資訊: 但是列印輸出後,我們會發

netty原始碼分析服務端啟動

ServerBootstrap與Bootstrap分別是netty中服務端與客戶端的引導類,主要負責服務端與客戶端初始化、配置及啟動引導等工作,接下來我們就通過netty原始碼中的示例對ServerBootstrap與Bootstrap的原始碼進行一個簡單的分析。首先我們知道這兩個類都繼承自AbstractB

SNMP原始碼分析(一)配置檔案部分

snmpd.conf想必不陌生。在程序啟動過程中會去讀取配置檔案中各個配置。其中幾個引數需要先知道是幹什麼的:   token:配置檔案的每行的開頭,例如 group MyROGroup v1 readSec 這行token的引數是group。