Spark core原始碼分析之spark叢集的啟動(二)
2.2 Worker的啟動
org.apache.spark.deploy.worker
1 從Worker的伴生物件的main方法進入
在main方法中首先是得到一個SparkConf例項conf,然後將conf和啟動Worker傳入的引數封裝得到WorkerArguments的例項args,下一步就是呼叫startSystemAndActor()方法得到actorSystem例項,在方法內部還建立並開啟屬於Worker的actor,程式碼如下:
main方法
private[spark] object Worker extends Logging {
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
// 準備了一些啟動Worker服務的引數
val args = new WorkerArguments(argStrings, conf)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
startSystemAndActor()方法,actorSystem.actorOf()如何建立並開啟一個actor在上一篇Master的啟動流程中有分析
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
// 建立ActorSystem例項,用於建立actor
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
// 獲得所有的masterAkkaUrl
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
// 成建立workerActor並啟動workerActor
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}
2 建立並執行屬於Worker的actor
建立Worker actor例項時會先執行Worker類的主構造器中的程式碼,當執行actor.start()方法時會開始執行actor的生命週期方法
1)執行preStart()方法,在preStart()方法中執行registerWithMaster()實現向master傳送註冊資訊,registerWithMaster()方法中主要執行了tryRegisterAllMasters()方法和啟動一個定時器,定時向master 重複註冊,程式碼如下:
preStart()方法
//生命週期方法
override def preStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
// 向Master進行註冊
registerWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
registerWithMaster()方法,詳細分析見程式碼註釋
//向Master註冊
def registerWithMaster() {
// DisassociatedEvent may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheduled.
// 使用模式匹配的作用:DisassociatedEvent可能會觸發多次,
// 而DisassociatedEvent對應的模式匹配中呼叫了registerWithMaster()方法,
// 因此這裡使用模式匹配,以實現當已經有未完成的註冊時不能試圖再去註冊
registrationRetryTimer match {
case None =>
registered = false
// 嘗試向所有Master註冊,為什麼要向所有的Master註冊,因為可能高可用叢集
tryRegisterAllMasters()
// 初始嘗試連線次數
connectionAttemptCount = 0
// 給registrationRetryTimer賦值,當再次觸發registerWithMaster(),不會重複執行tryRegisterAllMasters()
registrationRetryTimer = Some {
//開啟一個定時器重複註冊 ReregisterWithMaster,增加容錯性避免因為網路異常或master出錯導致的註冊失敗
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}
tryRegisterAllMasters()
//嘗試向所有的master傳送註冊
private def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
// 向actor傳送訊息,這裡的actor是Master的actor
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
2)執行receiveWithLogging方法,開始迴圈監聽等待接收訊息,其中與Worker啟動有關的訊息是RegisteredWorker和SendHeartbeat,下面將結合與Master的互動來分析這種訊息的作用
2.3 Master收到worker的註冊資訊
當Master接收到Worker傳送的註冊訊息RegisterWorker後,先判斷該worker是否已經註冊,如果已經註冊返回註冊失敗訊息RegisterWorkerFailed(“Duplicate worker ID”)給worker,否則將worker傳送過來的資訊封裝到WorkInfo物件中並儲存在記憶體和磁碟中,然後返回封裝了masterUrl和masterWebUiUrl資訊的註冊成功訊息RegisteredWorker(masterUrl, masterWebUiUrl)給worker,worker接收到RegisteredWorker訊息後,更新registered = true,執行changeMaster方法記錄有效的masterUrl資訊,然後啟動一個定時器,定時傳送訊息給自己以觸發向master傳送心跳的功能,程式碼如下:
override def receiveWithLogging = {
// Master向Worker傳送的MasterUrl,意味著註冊成功了
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl)
registered = true
// 把傳送過來的url進行更新
changeMaster(masterUrl, masterWebUiUrl)
// 啟動一個定時器向Master傳送心跳
// 這裡的實現過程是,向自己傳送一個SendHeartbeat訊息,
// 自己的receiveWithLogging接收到該訊息觸發對應的操作,
// 也就是先判斷是否和master還有連線,如果仍然連線就向master傳送心跳資訊
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
if (CLEANUP_ENABLED) {
logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
}
case SendHeartbeat =>
// 如果已經連線,向Master傳送心跳資訊
if (connected) { master ! Heartbeat(workerId) }