kafka叢集Controller競選與責任設計思路架構詳解-kafka 商業環境實戰
本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。

1 無所不能的Controller
-
某一個broker被選舉出來承擔特殊的角色,就是控制器Controller。
-
Leader會向zookeeper上註冊Watcher,其他broker幾乎不用監聽zookeeper的狀態變化。
-
Controller叢集就是用來管理和協調Kafka叢集的,具體就是管理叢集中所有分割槽的狀態和分割槽對應副本的狀態。
-
每一個Kafka叢集任意時刻都只能有一個controller,當叢集啟動的時候,所有的broker都會參與到controller的競選,最終只能有一個broker勝出。
-
Controller維護的狀態分為兩類:1:管理每一臺Broker上對應的分割槽副本。2:管理每一個Topic分割槽的狀態。
-
KafkaController 核心程式碼,其中包含副本狀態機和分割槽狀態機
class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId) // have a separate scheduler for the controller to be able to start and stop independently of the // kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) var deleteTopicManager: TopicDeletionManager = null val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this) private val partitionReassignedListener = new PartitionsReassignedListener(this) private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this) 複製程式碼
-
KafkaController中共定義了五種selector選舉器
1、ReassignedPartitionLeaderSelector 從可用的ISR中選取第一個作為leader,把當前的ISR作為新的ISR,將重分配的副本集合作為接收LeaderAndIsr請求的副本集合。 2、PreferredReplicaPartitionLeaderSelector 如果從assignedReplicas取出的第一個副本就是分割槽leader的話,則丟擲異常,否則將第一個副本設定為分割槽leader。 3、ControlledShutdownLeaderSelector 將ISR中處於關閉狀態的副本從集合中去除掉,返回一個新新的ISR集合,然後選取第一個副本作為leader,然後令當前AR作為接收LeaderAndIsr請求的副本。 4、NoOpLeaderSelector 原則上不做任何事情,返回當前的leader和isr。 5、OfflinePartitionLeaderSelector 從活著的ISR中選擇一個broker作為leader,如果ISR中沒有活著的副本,則從assignedReplicas中選擇一個副本作為leader,leader選舉成功後註冊到Zookeeper中,並更新所有的快取。 複製程式碼
-
kafka修改分割槽和副本數
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe--topic test1 Topic:test1PartitionCount:3ReplicationFactor:2Configs: Topic: test1Partition: 0Leader: 2Replicas: 2,4Isr: 2,4 Topic: test1Partition: 1Leader: 3Replicas: 3,5Isr: 3,5 Topic: test1Partition: 2Leader: 4Replicas: 4,1Isr: 4,1 複製程式碼
-
topic 分割槽擴容
./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1 複製程式碼
2 ReplicaStateMachine (ZK持久化副本分配方案)
-
Replica有7種狀態:
1 NewReplica: 在partition reassignment期間KafkaController建立New replica 2 OnlineReplica: 當一個replica變為一個parition的assingned replicas時 其狀態變為OnlineReplica, 即一個有效的OnlineReplica 3 Online狀態的parition才能轉變為leader或isr中的一員 4 OfflineReplica: 當一個broker down時, 上面的replica也隨之die, 其狀態轉變為Onffline; ReplicaDeletionStarted: 當一個replica的刪除操作開始時,其狀態轉變為ReplicaDeletionStarted 5 ReplicaDeletionSuccessful: Replica成功刪除後,其狀態轉變為ReplicaDeletionSuccessful 6 ReplicaDeletionIneligible: Replica成功失敗後,其狀態轉變為ReplicaDeletionIneligible 7 NonExistentReplica:Replica成功刪除後, 從ReplicaDeletionSuccessful狀態轉變為NonExistentReplica狀態 複製程式碼
-
ReplicaStateMachine 所在檔案: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
-
startup: 啟動ReplicaStateMachine
-
initializeReplicaState: 初始化每個replica的狀態, 如果replica所在的broker是live狀態,則此replica的狀態為OnlineReplica。
-
處理可以轉換到Online狀態的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 並且傳送LeaderAndIsrRequest到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
-
當建立某個topic時,該topic下所有分割槽的所有副本都是NonExistent。
-
當controller載入Zookeeper中該topic每一個分割槽的所有副本資訊到記憶體中,同時將副本的狀態變更為New。
-
之後controller選擇該分割槽副本列表中的第一個副本作為分割槽的leader副本並設定所有副本進入ISR,然後在Zookeeper中持久化該決定。
-
一旦確定了分割槽的Leader和ISR之後,controller會將這些訊息以請求的方式傳送給所有的副本。
-
同時將這些副本狀態同步到叢集的所有broker上以便讓他們知曉。
-
最後controller 會把分割槽的所有副本狀態設定為Online。
3 partitionStateMachine (根據副本分配方案建立分割槽)
-
Partition有如下四種狀態
NonExistentPartition: 這個partition還沒有被建立或者是建立後又被刪除了; NewPartition: 這個parition已建立, replicas也已分配好,但leader/isr還未就緒; OnlinePartition: 這個partition的leader選好; OfflinePartition: 這個partition的leader掛了,這個parition狀態為OfflinePartition; 複製程式碼
-
當建立Topic時,controller負責建立分割槽物件,它首先會短暫的將所有分割槽狀態設定為NonExistent。
-
之後讀取Zookeeper讀取副本分配方案,然後令分割槽狀態設定為NewPartion。
-
處於NewPartion狀態的分割槽尚未有leader和ISR,因此Controller會初始化leader和ISR資訊並設定分割槽狀態為OnlinePartion,此時分割槽正常工作。