1. 程式人生 > >kafka叢集Controller競選與責任設計思路架構詳解-kafka 商業環境實戰

kafka叢集Controller競選與責任設計思路架構詳解-kafka 商業環境實戰

本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。

1 無所不能的Controller

  • 某一個broker被選舉出來承擔特殊的角色,就是控制器Controller。

  • Leader會向zookeeper上註冊Watcher,其他broker幾乎不用監聽zookeeper的狀態變化。

  • Controller叢集就是用來管理和協調Kafka叢集的,具體就是管理叢集中所有分割槽的狀態和分割槽對應副本的狀態。

  • 每一個Kafka叢集任意時刻都只能有一個controller,當叢集啟動的時候,所有的broker都會參與到controller的競選,最終只能有一個broker勝出。

  • Controller維護的狀態分為兩類:1:管理每一臺Broker上對應的分割槽副本。2:管理每一個Topic分割槽的狀態。

  • KafkaController 核心程式碼,其中包含副本狀態機和分割槽狀態機

      class KafkaController(val config : KafkaConfig, zkClient: ZkClient, 
      val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
          this.logIdent = "[Controller " + config.brokerId + "]: "
          private var isRunning = true
          private val stateChangeLogger = KafkaController.stateChangeLogger
          val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
    
          val partitionStateMachine = new PartitionStateMachine(this)
          val replicaStateMachine = new ReplicaStateMachine(this)
          
          private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
          onControllerResignation, config.brokerId)
          // have a separate scheduler for the controller to be able to start and stop independently of the
          // kafka server
          private val autoRebalanceScheduler = new KafkaScheduler(1)
          var deleteTopicManager: TopicDeletionManager = null
          val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
          private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
          private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
          private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
          private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
          
          private val partitionReassignedListener = new PartitionsReassignedListener(this)
          private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
    複製程式碼
  • KafkaController中共定義了五種selector選舉器

      1、ReassignedPartitionLeaderSelector
      從可用的ISR中選取第一個作為leader,把當前的ISR作為新的ISR,將重分配的副本集合作為接收LeaderAndIsr請求的副本集合。
      2、PreferredReplicaPartitionLeaderSelector
      如果從assignedReplicas取出的第一個副本就是分割槽leader的話,則丟擲異常,否則將第一個副本設定為分割槽leader。
      3、ControlledShutdownLeaderSelector
      將ISR中處於關閉狀態的副本從集合中去除掉,返回一個新新的ISR集合,然後選取第一個副本作為leader,然後令當前AR作為接收LeaderAndIsr請求的副本。
      4、NoOpLeaderSelector
      原則上不做任何事情,返回當前的leader和isr。
      5、OfflinePartitionLeaderSelector
      從活著的ISR中選擇一個broker作為leader,如果ISR中沒有活著的副本,則從assignedReplicas中選擇一個副本作為leader,leader選舉成功後註冊到Zookeeper中,並更新所有的快取。
    複製程式碼
  • kafka修改分割槽和副本數

      ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe  --topic test1
      
      Topic:test1       PartitionCount:3        ReplicationFactor:2     Configs:
      Topic: test1      Partition: 0    Leader: 2       Replicas: 2,4   Isr: 2,4
      Topic: test1      Partition: 1    Leader: 3       Replicas: 3,5   Isr: 3,5
      Topic: test1      Partition: 2    Leader: 4       Replicas: 4,1   Isr: 4,1
    複製程式碼
  • topic 分割槽擴容

      ./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1
    複製程式碼

2 ReplicaStateMachine (ZK持久化副本分配方案)

  • Replica有7種狀態:

      1 NewReplica: 在partition reassignment期間KafkaController建立New replica
      
      2 OnlineReplica: 當一個replica變為一個parition的assingned replicas時
      其狀態變為OnlineReplica, 即一個有效的OnlineReplica
      
      3 Online狀態的parition才能轉變為leader或isr中的一員
      
      4 OfflineReplica: 當一個broker down時, 上面的replica也隨之die, 其狀態轉變為Onffline;
      ReplicaDeletionStarted: 當一個replica的刪除操作開始時,其狀態轉變為ReplicaDeletionStarted
      
      5 ReplicaDeletionSuccessful: Replica成功刪除後,其狀態轉變為ReplicaDeletionSuccessful
      
      6 ReplicaDeletionIneligible: Replica成功失敗後,其狀態轉變為ReplicaDeletionIneligible
      
      7 NonExistentReplica:  Replica成功刪除後, 從ReplicaDeletionSuccessful狀態轉變為NonExistentReplica狀態
    複製程式碼
  • ReplicaStateMachine 所在檔案: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala

  • startup: 啟動ReplicaStateMachine

  • initializeReplicaState: 初始化每個replica的狀態, 如果replica所在的broker是live狀態,則此replica的狀態為OnlineReplica。

  • 處理可以轉換到Online狀態的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 並且傳送LeaderAndIsrRequest到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)

  • 當建立某個topic時,該topic下所有分割槽的所有副本都是NonExistent。

  • 當controller載入Zookeeper中該topic每一個分割槽的所有副本資訊到記憶體中,同時將副本的狀態變更為New。

  • 之後controller選擇該分割槽副本列表中的第一個副本作為分割槽的leader副本並設定所有副本進入ISR,然後在Zookeeper中持久化該決定。

  • 一旦確定了分割槽的Leader和ISR之後,controller會將這些訊息以請求的方式傳送給所有的副本。

  • 同時將這些副本狀態同步到叢集的所有broker上以便讓他們知曉。

  • 最後controller 會把分割槽的所有副本狀態設定為Online。

3 partitionStateMachine (根據副本分配方案建立分割槽)

  • Partition有如下四種狀態

      NonExistentPartition: 這個partition還沒有被建立或者是建立後又被刪除了;
      NewPartition: 這個parition已建立, replicas也已分配好,但leader/isr還未就緒;
      OnlinePartition: 這個partition的leader選好;
      OfflinePartition: 這個partition的leader掛了,這個parition狀態為OfflinePartition;
    複製程式碼
  • 當建立Topic時,controller負責建立分割槽物件,它首先會短暫的將所有分割槽狀態設定為NonExistent。

  • 之後讀取Zookeeper副本分配方案,然後令分割槽狀態設定為NewPartion。

  • 處於NewPartion狀態的分割槽尚未有leader和ISR,因此Controller會初始化leader和ISR資訊並設定分割槽狀態為OnlinePartion,此時分割槽正常工作。

  • 本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。

4 Controller職責所在(監聽znode狀態變化做執行)

  • UpdateMetadataRequest:更新元資料請求(比如:topic有多少個分割槽,每一個分割槽的leader在哪一臺broker上以及分割槽的副本列表),隨著叢集的執行,這部分資訊隨時都可能變更,一旦發生變更,controller會將最新的元資料廣播給所有存活的broker。具體方式就是給所有broker傳送UpdateMetadataRequest請求
  • CreateTopics: 建立topic請求。當前不管是通過API方式、指令碼方式(--create)抑或是CreateTopics請求方式來建立topic,做法幾乎都是在Zookeeper的/brokers/topics下建立znode來觸發建立邏輯,而controller會監聽該path下的變更來執行真正的“建立topic”邏輯
  • DeleteTopics:刪除topic請求。和CreateTopics類似,也是通過建立Zookeeper下的/admin/delete_topics/節點來觸發刪除topic,主要邏輯有:1:停止所有副本執行。2:刪除所有副本的日誌資料。3:移除zk上的 /admin/delete_topics/節點。
  • 分割槽重分配:即kafka-reassign-partitions指令碼做的事情。同樣是與Zookeeper結合使用,指令碼寫入/admin/reassign_partitions節點來觸發,controller負責按照方案分配分割槽。執行過程是:先擴充套件再伸縮機制(就副本和新副本集合同時存在)。
  • Preferred leader分配:調整分割槽leader副本,preferred leader選舉當前有兩種觸發方式:1. 自動觸發(auto.leader.rebalance.enable = true),controller會自動調整Preferred leader。2. kafka-preferred-replica-election指令碼觸發。兩者步驟相同,都是向Zookeeper的/admin/preferred_replica_election寫資料,controller提取資料執行preferred leader分配
  • 分割槽擴充套件:即增加topic分割槽數。標準做法也是通過kafka-reassign-partitions指令碼完成,不過使用者可直接往Zookeeper中寫資料來實現,比如直接把新增分割槽的副本集合寫入到/brokers/topics/下,然後controller會為你自動地選出leader並增加分割槽
  • 叢集擴充套件:新增broker時Zookeeper中/brokers/ids下會新增znode,controller自動完成服務發現的工作
  • broker崩潰:同樣地,controller通過Zookeeper可實時偵測broker狀態。一旦有broker掛掉了,controller可立即感知併為受影響分割槽選舉新的leader
  • ControlledShutdown:broker除了崩潰,還能“優雅”地退出。broker一旦自行終止,controller會接收到一個ControlledShudownRequest請求,然後controller會妥善處理該請求並執行各種收尾工作
  • Controller leader選舉:controller必然要提供自己的leader選舉以防這個全域性唯一的元件崩潰宕機導致服務中斷。這個功能也是通過Zookeeper的幫助實現的。

5 Controller與Broker之間的通訊機制(NIO select)

  • controller啟動時會為叢集中的所有Broker建立一個專屬的Socket連線,加入有100臺broker機器,那麼controller會建立100個Socket連線。新版本目前統一使用NIO select ,實際上還是要維護100個執行緒。

6 ControllerContext資料元件

  • controller的快取,可謂是最重要的資料元件了,ControllerContext彙總了Zookeeper中關於kafka叢集中所有元資料資訊,是controller能夠正確提供服務的基礎。

本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。

7 總結

kafka叢集Controller主要幹通過ZK持久化副本分配方案,根據副本分配方案建立分割槽,監聽ZK znode狀態變化做執行處理,維護分割槽和副本ISR機制穩定執行。感謝huxihx技術部落格以及相關書籍,讓我理解了Controller核心機制,寫一篇學習筆記,作為總結,辛苦成文,實屬不易,謝謝。

秦凱新 於深圳 201812021541