1. 程式人生 > >kafka原始碼之kafkaserver的啟動

kafka原始碼之kafkaserver的啟動

KAFKA的啟動

Kafka啟動時,通過進入kafkabin路徑下,執行如下指令碼:

./kafka-server-start.sh ../config/server.properties

這個指令碼會啟動Kafka類的例項,並執行main函式,傳入的引數是server.properties的路徑.

defmain(args: Array[String]): Unit = {try {

載入對應的server.properties配置檔案,並生成Properties例項.val serverProps = getPropsFromArgs(args)

這裡生成一個KafkaServer的例項,這個例項生成時,會在例項中同時生成一個KafkaServer的例項,

生成KafkaServer例項前,需要先通過serverProps生成出一個KafkaConfig的例項.val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

新增對kill操作的勾子函式.用於處理,如果直接kill時關閉kafkaserver.// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread() {override def run() = {        kafkaServerStartable.shutdown

      }    })啟動並等待server停止.    kafkaServerStartable.startup    kafkaServerStartable.awaitShutdown  }catch {case e: Throwable =>      fatal(e)      System.exit(1)  }System.exit(0)}

根據properties生成server例項

KafkaServerStartable.fromProps(serverProps)函式呼叫時,也就是kakfa啟動時,

new KafkaServerStartable(KafkaConfig.

fromProps(serverProps))

KafkaServerStartable例項生成時,會生成KafkaServer例項:

class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {private val server new KafkaServer(serverConfig)

KafkaConfig.fromProps(serverProps)的函式呼叫流程:

deffromProps(props: Properties): KafkaConfig =fromProps(propstrue)def fromProps(props: PropertiesdoLog: Boolean): KafkaConfig =new KafkaConfig(propsdoLog)

KafkaServer例項用於對所有的元件進行統一的初始化與啟動.

KafkaServer的啟動

Kafkamain函式中執行startup,會呼叫KafkaServer例項中的startup函式.

KafkaServer的例項生成時,會在每個logDir的目錄下生成一個meta.properties配置檔案,

這個檔案中主要記錄有這個kafka的版本與broker.id的值.

KafkaServer的例項啟動時,會生成kafka對外服務的socket server與相關元件,並對其進行啟動.

在執行startup函式時,下面分析下這個函式的具體的執行流程:

1,設定brokerState的狀態為Starting的狀態.

brokerState.newState(Starting)

2,啟動kafka的排程器,這個KafkaScheduler的例項生成時需要得到background.threads配置的值,預設是10個,用於配置後臺執行緒池的個數.

/* start scheduler */kafkaScheduler.startup()

3,初始化與zookeeper的連線,

這裡需要的配置項:

配置項zookeeper.connect,預設值localhost:2181.用於設定kafka連線的zookeeper的連線地址.

配置項zookeeper.session.timeout.ms,預設值6000ms,用於控制zk的session的超時時間,可設定為同步時間的2倍或3倍.

配置項zookeeper.connection.timeout.ms,預設值6000ms,用於配置連線zk的連線超時時間.

配置項zookeeper.sync.time.ms,預設值2000ms,用於與zk進行同步的時間間隔,

配置項zookeeper.set.acl,是否啟用zookeeper的acl控制,預設值為false,表示不啟用.

這裡得到的zkUtils例項是一個ZkUtils的例項,在例項生成後,會判斷zk中是否存在如下地址,如果不存在,會建立對應的路徑在zk上.

路徑/consumers,這個路徑用於消費者的client.id儲存對應消費的offset的路徑.

路徑/brokers/ids,這個路徑用於儲存所有的broker id的路徑.

路徑/brokers/topics,用於儲存每個broker對應的topics的資訊,

路徑/config/changes,還不知道,後期用到在說.

路徑/config/topics,還不知道,後期用到在說.

路徑/config/clients,還不知道,後期用到在說.

路徑/admin/delete_topics,用於儲存刪除的topic的資訊.

路徑/brokers/seqid,還不知道,後期用到在說.

路徑/isr_change_notification,這個用於在kafka的副本broker發生變化時用於通知的儲存路徑.

/* setup zookeeper */zkUtils = initZk()

4,初始化建立並啟動LogManager的例項,

/* start log manager */logManager = createLogManager(zkUtils.zkClientbrokerState)logManager.startup()

5,得到當前配置檔案中的brokerId的資訊.

如果broker.id的配置沒有配置(小於0的值時),同時broker.id.generation.enable配置為true,預設也就是true,這個時候根據zk中/brokers/seqid路徑的version值,第一次從0開始,每次增加.並加上reserved.broker.max.id配置的值,預設是1000,來充當這個server的broker.id,同時把這個broker.id更新到logDir目錄下的meta.properties檔案中,下次讀取時,直接讀取這個配置檔案中的broker.id的值,而不需要重新進行建立.

/* generate brokerId */config.brokerId =  getBrokerIdthis.logIdent "[Kafka Server " + config.brokerId "], "

6,生成並啟動kafka的SocketServer.

socketServer new SocketServer(configmetricskafkaMetricsTime)socketServer.startup()

7,生成並啟動ReplicaManager,此例項依賴kafkaScheduler與logManager例項.

/* start replica manager */replicaManager new ReplicaManager(configmetricstimekafkaMetricsTime

zkUtilskafkaSchedulerlogManager,isShuttingDown)replicaManager.startup()

8,生成並啟動KafkaController例項,此使用用於控制當前的broker中的所有的leader的partition的操作.

/* start kafka controller */kafkaController new KafkaController(configzkUtilsbrokerState,

kafkaMetricsTimemetricsthreadNamePrefix)kafkaController.startup()

9,生成並啟動GroupCoordinator的例項,這個是0.9新加入的一個玩意,用於對consumer中新加入的與partition的檢查,並對partition與consumer進行平衡操作.

/* start kafka coordinator */consumerCoordinator = GroupCoordinator.create(configzkUtilsreplicaManager)consumerCoordinator.startup()

10,根據authorizer.class.name配置項配置的Authorizer的實現類,生成一個用於認證的例項,用於對使用者的操作進行認證.這個預設為不認證.

/* Get the authorizer and initialize it if one is specified.*/authorizer Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)  authZ.configure(config.originals())  authZ}

11,生成用於對外對外提供服務的KafkaApis例項,並設定當前的broker的狀態為執行狀態.

/* start processing requests */apis new KafkaApis(socketServer.requestChannelreplicaManager

consumerCoordinator,kafkaControllerzkUtilsconfig.brokerIdconfigmetadataCachemetrics,

authorizer)requestHandlerPool new KafkaRequestHandlerPool(config.brokerIdsocketServer.requestChannelapisconfig.numIoThreads)brokerState.newState(RunningAsBroker)

12,生成動態配置修改的處理管理,主要是topic修改與client端配置的修改,並把已經存在的clientid對應的配置進行修改.

/* start dynamic config manager */dynamicConfigHandlers Map[StringConfigHandler](

ConfigType.Topic -> new TopicConfigHandler(logManager),ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers)

)// TODO: Move this logic to DynamicConfigManagerAdminUtils.fetchAllEntityConfigs(zkUtilsConfigType.Client).foreach {case (clientIdproperties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientIdproperties)}// Create the config manager. start listening to notificationsdynamicConfigManager new DynamicConfigManager(zkUtilsdynamicConfigHandlers)dynamicConfigManager.startup()

13,生成kafka的心跳檢查處理工具,這裡需要使用到listeners的配置,根據是否在IAAS的環境下,需要使用到advertised相關配置,

如果advertised.listeners配置項存在,直接使用配置的listener,

否則,如果advertised.host.name配置項或者advertised.port配置項存在,使用這兩個配置項,並使用明文傳輸(PLAINTEXT://host:port),如果advertised.port沒有配置,直接使用port的配置,host可以沒有設定

最後,如果上面的不都滿足,直接使用listeners的配置.預設是PLAINTEXT://:port

/* tell everyone we are alive */val listeners = config.advertisedListeners.map {case(protocolendpoint) =>if (endpoint.port == 0)    (protocolEndPoint(endpoint.hostsocketServer.boundPort(protocol)endpoint.protocolType))else(protocolendpoint)}kafkaHealthcheck new KafkaHealthcheck(config.brokerIdlistenerszkUtils)kafkaHealthcheck.startup()