1. 程式人生 > >Spark原始碼學習(二)---Master和Worker的啟動以及Actor通訊流程

Spark原始碼學習(二)---Master和Worker的啟動以及Actor通訊流程

在《Spark原始碼學習(一)》中通過Spark的啟動指令碼,我們看到Spark啟動Master的時候實際上是啟動了org.apache.spark.deploy.master.Master,下面我們就從這2個類入手,通過閱讀Spark的原始碼,瞭解Spark的啟動流程。

1,首先看一下org.apache.spark.deploy.master.Master:

(1)從Master的main方法開始:

 val conf = new SparkConf
 val args = new MasterArguments(argStrings, conf)

val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)

(2)startSystemAndActor方法的關鍵程式碼:
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
  securityManager = securityMgr)
val actor = actorSystem.actorOf(
  Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

(3)createActorSystem關鍵程式碼:
val startService: Int => (ActorSystem, Int) = { actualPort =>
  doCreateActorSystem(name, host, actualPort, conf, securityManager)
}

這個函式執行後,會返回一個ActorSystem和被繫結的埠

(4)在(2)中actorSystem.actorOf的引數classOf[Master]:相當於java中的Master.class,此時會呼叫Master的構造方法和生命週期方法;----preStart()方法:

context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis,
 self, CheckForWorkerTimeOut)
這句話是啟動一個定時器,向自身傳送一個CheckForWorkerTimeOut(檢測Worker超時)
訊息,通過檢視原始碼,CheckForWorkerTimeOut被定義在MasterMessages中,是一個
case object CheckForWorkerTimeOut

(5)Master的receiveWithLogging方法中:
case CheckForWorkerTimeOut => {
  timeOutDeadWorkers()
}

表示Master接收到檢測超時訊息後的處理,通過檢視timeOutDeadWorkers的程式碼:會把超時的Work從記憶體中移除.(6)下面我們來看一下Worker是如何註冊的:org.apache.spark.deploy.worker.Worker中我們直接從preStart()方法看起:registerWithMaster()表示向Master傳送註冊訊息,關鍵程式碼:
registrationRetryTimer = Some {
  context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
    INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}

向自身傳送一個ReregisterWithMaster訊息;---
case ReregisterWithMaster =>
  reregisterWithMaster()
---
master ! RegisterWorker(
  workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

(7)Master接收到RegisterWorker訊息,進行處理:
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
......如果當前節點未註冊過,把節點資訊記錄到記憶體,並返回註冊成功訊息,否則返回註冊失敗
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()

(8)Worker接收到RegisteredWorker註冊成功訊息:
case RegisteredWorker(
//更新Master的地址資訊並定時向自身傳送心跳訊息:
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, 
self, SendHeartbeat)
----自身接收到心跳資訊,判斷如果和Master是正常連線狀態,就向Master傳送一個心跳訊息:
case SendHeartbeat =>
if (connected) { master ! Heartbeat(workerId) }

(9)Master接收到Worker的心跳訊息:
case Heartbeat(workerId) => {
......如果記憶體中存在該Worker,則更新“最近一次連線成功時間”,否則向Worker傳送一個重連
訊息:
idToWorker.get(workerId) match {
  case Some(workerInfo) =>
    workerInfo.lastHeartbeat = System.currentTimeMillis()
  case None =>
    if (workers.map(_.id).contains(workerId)) {
      logWarning(s"Got heartbeat from unregistered worker $workerId." +
        " Asking it to re-register.")
      sender ! ReconnectWorker(masterUrl)
    } else {
      logWarning(s"Got heartbeat from unregistered worker $workerId." +
        " This worker was never registered, so ignoring the heartbeat.")
    }
}

(10)如果Worker接收到ReconnectWorker訊息,則進行重連:
case ReconnectWorker(masterUrl) =>
  logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
  registerWithMaster()

以上就是Spark的Master和Worker的啟動以及Actor通訊的主體流程!